From 599d15942e7c2c4882333e968fb996ff6844e551 Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Wed, 5 Jun 2019 16:53:03 +0800 Subject: [PATCH] DefaultFuture --- .../dubbo/demo/provider/DemoServiceImpl.java | 10 ------ .../dubbo/registry/dubbo/MockChannel.java | 8 ++--- .../dubbo/registry/dubbo/MockedClient.java | 14 ++++---- .../remoting/exchange/ExchangeChannel.java | 8 ++--- .../exchange/support/DefaultFuture.java | 36 +++++++++++-------- .../support/header/HeaderExchangeChannel.java | 18 +++++----- .../support/header/HeaderExchangeClient.java | 20 +++++------ .../java/org/apache/dubbo/remoting/Main.java | 10 +++--- .../remoting/PerformanceClientFixedTest.java | 5 ++- .../dubbo/remoting/PerformanceClientTest.java | 9 +++-- .../exchange/support/DefaultFutureTest.java | 9 +++-- .../header/HeaderExchangeChannelTest.java | 9 ++--- .../transport/mina/ClientToServerTest.java | 2 +- .../transport/netty/ClientToServerTest.java | 2 +- .../transport/netty4/ClientToServerTest.java | 2 +- .../protocol/dubbo/ChannelWrappedInvoker.java | 11 +----- .../rpc/protocol/dubbo/DubboInvoker.java | 4 +-- .../dubbo/LazyConnectExchangeClient.java | 18 +++++----- .../dubbo/ReferenceCountExchangeClient.java | 16 ++++----- .../rpc/protocol/thrift/ThriftInvoker.java | 4 +-- .../rpc/protocol/thrift/ThriftCodecTest.java | 5 +-- 21 files changed, 108 insertions(+), 112 deletions(-) diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java index 5e57f77e51e..bceeae202de 100644 --- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java +++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java @@ -30,22 +30,12 @@ public class DemoServiceImpl implements DemoService { @Override public String sayHello(String name) { logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress(); } @Override public CompletableFuture sayHelloAsync(String name) { CompletableFuture cf = CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } return "async result"; }); return cf; diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java index e563da616ce..9c433591725 100644 --- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java +++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java @@ -86,21 +86,21 @@ public ChannelHandler getChannelHandler() { return null; } - public CompletableFuture request(Object request) throws RemotingException { + public CompletableFuture request(Object request, CompletableFuture completableFuture) throws RemotingException { return null; } - public CompletableFuture request(Object request, int timeout) throws RemotingException { + public CompletableFuture request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { return null; } @Override - public CompletableFuture request(Object request, ExecutorService executor) throws RemotingException { + public CompletableFuture request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { return null; } @Override - public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { return null; } diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java index ad06cd20960..bd62d3a62f0 100644 --- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java +++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java @@ -81,21 +81,21 @@ public void send(Object msg) throws RemotingException { this.sent = msg; } - public CompletableFuture request(Object msg) throws RemotingException { - return request(msg, null); + public CompletableFuture request(Object msg, CompletableFuture completableFuture) throws RemotingException { + return request(msg, null, completableFuture); } - public CompletableFuture request(Object msg, int timeout) throws RemotingException { - return this.request(msg, timeout, null); + public CompletableFuture request(Object msg, int timeout, CompletableFuture completableFuture) throws RemotingException { + return this.request(msg, timeout, null, completableFuture); } @Override - public CompletableFuture request(Object msg, ExecutorService executor) throws RemotingException { - return this.request(msg, 0, executor); + public CompletableFuture request(Object msg, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return this.request(msg, 0, executor, completableFuture); } @Override - public CompletableFuture request(Object msg, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture request(Object msg, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { this.invoked = msg; return new CompletableFuture() { public Object get() throws InterruptedException, ExecutionException { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java index c0cf131d8e0..48a9f6ff2fc 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java @@ -35,7 +35,7 @@ public interface ExchangeChannel extends Channel { * @throws RemotingException */ @Deprecated - CompletableFuture request(Object request) throws RemotingException; + CompletableFuture request(Object request, CompletableFuture completableFuture) throws RemotingException; /** * send request. @@ -46,7 +46,7 @@ public interface ExchangeChannel extends Channel { * @throws RemotingException */ @Deprecated - CompletableFuture request(Object request, int timeout) throws RemotingException; + CompletableFuture request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException; /** * send request. @@ -55,7 +55,7 @@ public interface ExchangeChannel extends Channel { * @return response future * @throws RemotingException */ - CompletableFuture request(Object request, ExecutorService executor) throws RemotingException; + CompletableFuture request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException; /** * send request. @@ -65,7 +65,7 @@ public interface ExchangeChannel extends Channel { * @return response future * @throws RemotingException */ - CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException; + CompletableFuture request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException; /** * get message handler. diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index 0abecf20301..a23e8cd9199 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -44,7 +45,7 @@ /** * DefaultFuture. */ -public class DefaultFuture extends CompletableFuture { +public class DefaultFuture { private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class); @@ -52,6 +53,9 @@ public class DefaultFuture extends CompletableFuture { private static final Map FUTURES = new ConcurrentHashMap<>(); + // private static final Map PENDING_TASKS = new ConcurrentHashMap<>(); + private final CompletableFuture completableFuture; + public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), 30, @@ -66,20 +70,18 @@ public class DefaultFuture extends CompletableFuture { private volatile long sent; private Timeout timeoutCheckTask; - private ExecutorService executor; + private final ExecutorService executor; public ExecutorService getExecutor() { return executor; } - public void setExecutor(ExecutorService executor) { - this.executor = executor; - } - - private DefaultFuture(Channel channel, Request request, int timeout) { + private DefaultFuture(Channel channel, Request request, int timeout, ExecutorService executor, CompletableFuture completableFuture) { this.channel = channel; this.request = request; this.id = request.getId(); + this.executor = executor; + this.completableFuture = completableFuture; this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); @@ -104,9 +106,8 @@ private static void timeoutCheck(DefaultFuture future) { * @param timeout timeout * @return a new DefaultFuture */ - public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) { - final DefaultFuture future = new DefaultFuture(channel, request, timeout); - future.setExecutor(executor); + public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor, CompletableFuture completableFuture) { + final DefaultFuture future = new DefaultFuture(channel, request, timeout, executor, completableFuture); // timeout check timeoutCheck(future); return future; @@ -176,7 +177,6 @@ public static void received(Channel channel, Response response, boolean timeout) } } - @Override public boolean cancel(boolean mayInterruptIfRunning) { Response errorResult = new Response(id); errorResult.setStatus(Response.CLIENT_ERROR); @@ -187,6 +187,14 @@ public boolean cancel(boolean mayInterruptIfRunning) { return true; } + public boolean isDone() { + return this.completableFuture.isDone(); + } + + public Object get() throws ExecutionException, InterruptedException { + return this.completableFuture.get(); + } + public void cancel() { this.cancel(true); } @@ -196,11 +204,11 @@ private void doReceived(Response res) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { - this.complete(res.getResult()); + this.completableFuture.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { - this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); + this.completableFuture.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { - this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); + this.completableFuture.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java index b95461294f1..a82bf531710 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java @@ -106,22 +106,22 @@ public void send(Object message, boolean sent) throws RemotingException { } @Override - public CompletableFuture request(Object request) throws RemotingException { - return request(request, null); + public CompletableFuture request(Object request, CompletableFuture completableFuture) throws RemotingException { + return request(request, null, completableFuture); } @Override - public CompletableFuture request(Object request, int timeout) throws RemotingException { - return request(request, timeout, null); + public CompletableFuture request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { + return request(request, timeout, null, completableFuture); } @Override - public CompletableFuture request(Object request, ExecutorService executor) throws RemotingException { - return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor); + public CompletableFuture request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor, completableFuture); } @Override - public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } @@ -130,14 +130,14 @@ public CompletableFuture request(Object request, int timeout, ExecutorSe req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); - DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); + DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor, completableFuture); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } - return future; + return completableFuture; } @Override diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 6671d1a30b0..8e04b99bbb1 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -34,11 +34,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat; -import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout; import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK; import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION; import static org.apache.dubbo.remoting.Constants.TICKS_PER_WHEEL; +import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat; +import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout; /** * DefaultMessageClient @@ -66,8 +66,8 @@ public HeaderExchangeClient(Client client, boolean startTimer) { } @Override - public CompletableFuture request(Object request) throws RemotingException { - return channel.request(request); + public CompletableFuture request(Object request, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, completableFuture); } @Override @@ -81,18 +81,18 @@ public InetSocketAddress getRemoteAddress() { } @Override - public CompletableFuture request(Object request, int timeout) throws RemotingException { - return channel.request(request, timeout); + public CompletableFuture request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, timeout, completableFuture); } @Override - public CompletableFuture request(Object request, ExecutorService executor) throws RemotingException { - return channel.request(request, executor); + public CompletableFuture request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, executor, completableFuture); } @Override - public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException { - return channel.request(request, timeout, executor); + public CompletableFuture request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, timeout, executor, completableFuture); } @Override diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java index 4478f86ecfd..9869bcc91fa 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java @@ -62,7 +62,7 @@ static void dataPackageTest(int port) throws Exception { sb.append("(" + random.nextLong() + ")"); Main.Data d = new Main.Data(); d.setData(sb.toString()); - client.request(d).get(); + client.request(d, new CompletableFuture()).get(); } System.out.println("send finished."); } @@ -83,18 +83,18 @@ public void run() { private static void test(int port) throws Exception { ExchangeChannel client = Exchangers.connect(URL.valueOf("dubbo://localhost:" + port)); - MockResult result = (MockResult) client.request(new RpcMessage(DemoService.class.getName(), "plus", new Class[]{int.class, int.class}, new Object[]{55, 25})).get(); + MockResult result = (MockResult) client.request(new RpcMessage(DemoService.class.getName(), "plus", new Class[]{int.class, int.class}, new Object[]{55, 25}), new CompletableFuture()).get(); System.out.println("55+25=" + result.getResult()); for (int i = 0; i < 100; i++) - client.request(new RpcMessage(DemoService.class.getName(), "sayHello", new Class[]{String.class}, new Object[]{"qianlei" + i})); + client.request(new RpcMessage(DemoService.class.getName(), "sayHello", new Class[]{String.class}, new Object[]{"qianlei" + i}), new CompletableFuture()); for (int i = 0; i < 100; i++) - client.request(new Main.Data()); + client.request(new Main.Data(), new CompletableFuture()); System.out.println("=====test invoke====="); for (int i = 0; i < 100; i++) { - CompletableFuture future = client.request(new Main.Data()); + CompletableFuture future = client.request(new Main.Data(), new CompletableFuture()); System.out.println("invoke and get"); System.out.println("invoke result:" + future.get()); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java index 93ccc76e515..ab0228fc2dc 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.CompletableFuture; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; @@ -120,7 +121,9 @@ public void testClient() throws Exception { int index = rd.nextInt(connectionCount); ExchangeClient client = arrays.get(index); // ExchangeClient client = arrays.get(0); - String output = (String) client.request(messageBlock).get(); + CompletableFuture cf = new CompletableFuture(); + client.request(messageBlock, cf); + String output = (String) cf.get(); if (output.lastIndexOf(messageBlock) < 0) { System.out.println("send messageBlock;get " + output); diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java index f4dfbc2a3e0..b86b17d4005 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java @@ -28,6 +28,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -72,8 +73,9 @@ public void testClient() throws Throwable { exchangeClients[i] = Exchangers.connect(url); } - List serverEnvironment = (List) exchangeClients[0].request("environment").get(); - List serverScene = (List) exchangeClients[0].request("scene").get(); + CompletableFuture completableFuture = new CompletableFuture<>(); + List serverEnvironment = (List) exchangeClients[0].request("environment", completableFuture).get(); + List serverScene = (List) exchangeClients[0].request("scene", completableFuture).get(); // Create some data for test StringBuilder buf = new StringBuilder(length); @@ -101,7 +103,8 @@ public void run() { count.incrementAndGet(); ExchangeClient client = exchangeClients[index.getAndIncrement() % connections]; long start = System.currentTimeMillis(); - String result = (String) client.request(data).get(); + CompletableFuture completableFuture = new CompletableFuture<>(); + String result = (String) client.request(data, completableFuture).get(); long end = System.currentTimeMillis(); if (!data.equals(result)) { throw new IllegalStateException("Invalid result " + result); diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java index 0f19d157a7b..01db770f644 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java @@ -27,6 +27,7 @@ import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; public class DefaultFutureTest { @@ -95,7 +96,8 @@ public void timeoutSend() throws Exception { // timeout after 5 seconds. Channel channel = new MockedChannel(); Request request = new Request(10); - DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, null); + CompletableFuture cf = new CompletableFuture(); + DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, null, cf); //mark the future is sent DefaultFuture.sent(channel, request); while (!f.isDone()) { @@ -106,7 +108,7 @@ public void timeoutSend() throws Exception { // get operate will throw a timeout exception, because the future is timeout. try { - f.get(); + cf.get(); } catch (Exception e) { Assertions.assertTrue(e.getCause() instanceof TimeoutException, "catch exception is not timeout exception!"); System.out.println(e.getMessage()); @@ -119,7 +121,8 @@ public void timeoutSend() throws Exception { private DefaultFuture defaultFuture(int timeout) { Channel channel = new MockedChannel(); Request request = new Request(index.getAndIncrement()); - return DefaultFuture.newFuture(channel, request, timeout, null); + CompletableFuture completableFuture = new CompletableFuture<>(); + return DefaultFuture.newFuture(channel, request, timeout, null, completableFuture); } } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java index 2affe7d5134..873540d2034 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java @@ -29,6 +29,7 @@ import org.mockito.Mockito; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -143,7 +144,7 @@ public void requestTest01() throws RemotingException { Assertions.assertThrows(RemotingException.class, () -> { header.close(1000); Object requestob = new Object(); - header.request(requestob); + header.request(requestob, CompletableFuture.completedFuture(0)); }); } @@ -153,7 +154,7 @@ public void requestTest02() throws RemotingException { header = new HeaderExchangeChannel(channel); when(channel.getUrl()).thenReturn(url); Object requestob = new Object(); - header.request(requestob); + header.request(requestob, CompletableFuture.completedFuture(0)); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Request.class); verify(channel, times(1)).send(argumentCaptor.capture()); Assertions.assertEquals(argumentCaptor.getValue().getData(), requestob); @@ -170,7 +171,7 @@ public void send(Object req) throws RemotingException { }; header = new HeaderExchangeChannel(channel); Object requestob = new Object(); - header.request(requestob, 1000); + header.request(requestob, 1000, CompletableFuture.completedFuture(0)); }); } @@ -191,7 +192,7 @@ public void closeTest() { public void closeWithTimeoutTest02() { Assertions.assertFalse(channel.isClosed()); Request request = new Request(); - DefaultFuture.newFuture(channel, request, 100, null); + DefaultFuture.newFuture(channel, request, 100, null, CompletableFuture.completedFuture(0)); header.close(100); //return directly header.close(1000); diff --git a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java index 6414434f51d..bea1860e830 100644 --- a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java @@ -65,7 +65,7 @@ protected void tearDown() { @Test public void testFuture() throws Exception { - CompletableFuture future = client.request(new World("world")); + CompletableFuture future = client.request(new World("world"), new CompletableFuture()); Hello result = (Hello) future.get(); Assertions.assertEquals("hello,world", result.getName()); } diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java index 267f5694065..45ecd37c2d3 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java @@ -64,7 +64,7 @@ protected void tearDown() throws Exception { @Test public void testFuture() throws Exception { - CompletableFuture future = client.request(new World("world")); + CompletableFuture future = client.request(new World("world"), new CompletableFuture()); Hello result = (Hello) future.get(); Assertions.assertEquals("hello,world", result.getName()); } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java index 9b8db0027c2..a67f92b23ef 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java @@ -65,7 +65,7 @@ protected void tearDown() { @Test public void testFuture() throws Exception { - CompletableFuture future = client.request(new World("world")); + CompletableFuture future = client.request(new World("world"), new CompletableFuture()); Hello result = (Hello) future.get(); Assertions.assertEquals("hello,world", result.getName()); } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java index 101851240c0..6758d04082c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java @@ -24,7 +24,6 @@ import org.apache.dubbo.remoting.exchange.ExchangeClient; import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient; import org.apache.dubbo.remoting.transport.ClientDelegate; -import org.apache.dubbo.rpc.AppResponse; import org.apache.dubbo.rpc.AsyncRpcResult; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Result; @@ -34,7 +33,6 @@ import org.apache.dubbo.rpc.support.RpcUtils; import java.net.InetSocketAddress; -import java.util.concurrent.CompletableFuture; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; @@ -72,15 +70,8 @@ protected Result doInvoke(Invocation invocation) throws Throwable { currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false)); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { - CompletableFuture responseFuture = currentClient.request(inv); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); - responseFuture.whenComplete((appResponse, t) -> { - if (t != null) { - asyncRpcResult.completeExceptionally(t); - } else { - asyncRpcResult.complete((AppResponse) appResponse); - } - }); + currentClient.request(inv, asyncRpcResult); return asyncRpcResult; } } catch (RpcException e) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java index 4cf6259be50..ea7cb235ed9 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java @@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.support.RpcUtils; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.ReentrantLock; @@ -98,8 +97,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); ExecutorService executor = getCallbackExecutor(getUrl(), inv); asyncRpcResult.setExecutor(executor); - CompletableFuture responseFuture = currentClient.request(inv, timeout, executor); - asyncRpcResult.subscribeTo(responseFuture); + currentClient.request(inv, timeout, executor, asyncRpcResult); return asyncRpcResult; } } catch (TimeoutException e) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java index 77eae2cefcf..ffc3dfe56d9 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java @@ -35,8 +35,8 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY; -import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY; import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE; +import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY; /** * dubbo protocol support class. @@ -88,10 +88,10 @@ private void initClient() throws RemotingException { } @Override - public CompletableFuture request(Object request) throws RemotingException { + public CompletableFuture request(Object request, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request); + return client.request(request, completableFuture); } @Override @@ -109,24 +109,24 @@ public InetSocketAddress getRemoteAddress() { } @Override - public CompletableFuture request(Object request, int timeout) throws RemotingException { + public CompletableFuture request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request, timeout); + return client.request(request, timeout, completableFuture); } @Override - public CompletableFuture request(Object request, ExecutorService executor) throws RemotingException { + public CompletableFuture request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request, executor); + return client.request(request, executor, completableFuture); } @Override - public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request, timeout, executor); + return client.request(request, timeout, executor, completableFuture); } /** diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index ed3c61b2aff..c25ed2c4b27 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -57,8 +57,8 @@ public void reset(URL url) { } @Override - public CompletableFuture request(Object request) throws RemotingException { - return client.request(request); + public CompletableFuture request(Object request, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, completableFuture); } @Override @@ -77,18 +77,18 @@ public ChannelHandler getChannelHandler() { } @Override - public CompletableFuture request(Object request, int timeout) throws RemotingException { - return client.request(request, timeout); + public CompletableFuture request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, timeout, completableFuture); } @Override - public CompletableFuture request(Object request, ExecutorService executor) throws RemotingException { - return client.request(request, executor); + public CompletableFuture request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, executor, completableFuture); } @Override - public CompletableFuture request(Object request, int timeout, ExecutorService executor) throws RemotingException { - return client.request(request, timeout, executor); + public CompletableFuture request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, timeout, executor, completableFuture); } @Override diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java index 3d0aee9d031..0125dddcb51 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java @@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantLock; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; @@ -93,8 +92,7 @@ protected Result doInvoke(Invocation invocation) throws Throwable { int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); - CompletableFuture responseFuture = currentClient.request(inv, timeout); - asyncRpcResult.subscribeTo(responseFuture); + currentClient.request(inv, timeout, asyncRpcResult); return asyncRpcResult; } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e); diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java index a8c66a71f85..405550a6879 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; +import java.util.concurrent.CompletableFuture; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; @@ -133,7 +134,7 @@ public void testDecodeReplyResponse() throws Exception { Request request = createRequest(); - DefaultFuture future = DefaultFuture.newFuture(channel, request, 10); + DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, null, new CompletableFuture()); TMessage message = new TMessage("echoString", TMessageType.REPLY, ThriftCodec.getSeqId()); @@ -210,7 +211,7 @@ public void testDecodeExceptionResponse() throws Exception { Request request = createRequest(); - DefaultFuture future = DefaultFuture.newFuture(channel, request, 10); + DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, null, new CompletableFuture()); TMessage message = new TMessage("echoString", TMessageType.EXCEPTION, ThriftCodec.getSeqId());