Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default future from jeff-lv #4257

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,12 @@ public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}

@Override
public CompletableFuture<String> sayHelloAsync(String name) {
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "async result";
});
return cf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ public ChannelHandler getChannelHandler() {
return null;
}

public CompletableFuture<Object> request(Object request) throws RemotingException {
public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException {
return null;
}

public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException {
return null;
}

@Override
public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
return null;
}

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,21 @@ public void send(Object msg) throws RemotingException {
this.sent = msg;
}

public CompletableFuture<Object> request(Object msg) throws RemotingException {
return request(msg, null);
public CompletableFuture<Object> request(Object msg, CompletableFuture completableFuture) throws RemotingException {
return request(msg, null, completableFuture);
}

public CompletableFuture<Object> request(Object msg, int timeout) throws RemotingException {
return this.request(msg, timeout, null);
public CompletableFuture<Object> request(Object msg, int timeout, CompletableFuture completableFuture) throws RemotingException {
return this.request(msg, timeout, null, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object msg, ExecutorService executor) throws RemotingException {
return this.request(msg, 0, executor);
public CompletableFuture<Object> request(Object msg, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
return this.request(msg, 0, executor, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object msg, int timeout, ExecutorService executor) throws RemotingException {
public CompletableFuture<Object> request(Object msg, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
this.invoked = msg;
return new CompletableFuture<Object>() {
public Object get() throws InterruptedException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface ExchangeChannel extends Channel {
* @throws RemotingException
*/
@Deprecated
CompletableFuture<Object> request(Object request) throws RemotingException;
CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException;

/**
* send request.
Expand All @@ -46,7 +46,7 @@ public interface ExchangeChannel extends Channel {
* @throws RemotingException
*/
@Deprecated
CompletableFuture<Object> request(Object request, int timeout) throws RemotingException;
CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException;

/**
* send request.
Expand All @@ -55,7 +55,7 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException;
CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException;

/**
* send request.
Expand All @@ -65,7 +65,7 @@ public interface ExchangeChannel extends Channel {
* @return response future
* @throws RemotingException
*/
CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException;
CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException;

/**
* get message handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -44,14 +45,17 @@
/**
* DefaultFuture.
*/
public class DefaultFuture extends CompletableFuture<Object> {
public class DefaultFuture {

private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

// private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>();
private final CompletableFuture completableFuture;

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
Expand All @@ -66,20 +70,18 @@ public class DefaultFuture extends CompletableFuture<Object> {
private volatile long sent;
private Timeout timeoutCheckTask;

private ExecutorService executor;
private final ExecutorService executor;

public ExecutorService getExecutor() {
return executor;
}

public void setExecutor(ExecutorService executor) {
this.executor = executor;
}

private DefaultFuture(Channel channel, Request request, int timeout) {
private DefaultFuture(Channel channel, Request request, int timeout, ExecutorService executor, CompletableFuture completableFuture) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.executor = executor;
this.completableFuture = completableFuture;
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
Expand All @@ -104,9 +106,8 @@ private static void timeoutCheck(DefaultFuture future) {
* @param timeout timeout
* @return a new DefaultFuture
*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor, CompletableFuture completableFuture) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout, executor, completableFuture);
// timeout check
timeoutCheck(future);
return future;
Expand Down Expand Up @@ -176,7 +177,6 @@ public static void received(Channel channel, Response response, boolean timeout)
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
Response errorResult = new Response(id);
errorResult.setStatus(Response.CLIENT_ERROR);
Expand All @@ -187,6 +187,14 @@ public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}

public boolean isDone() {
return this.completableFuture.isDone();
}

public Object get() throws ExecutionException, InterruptedException {
return this.completableFuture.get();
}

public void cancel() {
this.cancel(true);
}
Expand All @@ -196,11 +204,11 @@ private void doReceived(Response res) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
this.complete(res.getResult());
this.completableFuture.complete(res.getResult());
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
this.completableFuture.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
} else {
this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
this.completableFuture.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
}

// the result is returning, but the caller thread may still waiting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,22 +106,22 @@ public void send(Object message, boolean sent) throws RemotingException {
}

@Override
public CompletableFuture<Object> request(Object request) throws RemotingException {
return request(request, null);
public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException {
return request(request, null, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
return request(request, timeout, null);
public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException {
return request(request, timeout, null, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Expand All @@ -130,14 +130,14 @@ public CompletableFuture<Object> request(Object request, int timeout, ExecutorSe
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor, completableFuture);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
return completableFuture;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
import static org.apache.dubbo.remoting.Constants.TICKS_PER_WHEEL;
import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat;
import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;

/**
* DefaultMessageClient
Expand Down Expand Up @@ -66,8 +66,8 @@ public HeaderExchangeClient(Client client, boolean startTimer) {
}

@Override
public CompletableFuture<Object> request(Object request) throws RemotingException {
return channel.request(request);
public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException {
return channel.request(request, completableFuture);
}

@Override
Expand All @@ -81,18 +81,18 @@ public InetSocketAddress getRemoteAddress() {
}

@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
return channel.request(request, timeout);
public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException {
return channel.request(request, timeout, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
return channel.request(request, executor);
public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
return channel.request(request, executor, completableFuture);
}

@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
return channel.request(request, timeout, executor);
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException {
return channel.request(request, timeout, executor, completableFuture);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static void dataPackageTest(int port) throws Exception {
sb.append("(" + random.nextLong() + ")");
Main.Data d = new Main.Data();
d.setData(sb.toString());
client.request(d).get();
client.request(d, new CompletableFuture()).get();
}
System.out.println("send finished.");
}
Expand All @@ -83,18 +83,18 @@ public void run() {

private static void test(int port) throws Exception {
ExchangeChannel client = Exchangers.connect(URL.valueOf("dubbo://localhost:" + port));
MockResult result = (MockResult) client.request(new RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class, int.class}, new Object[]{55, 25})).get();
MockResult result = (MockResult) client.request(new RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class, int.class}, new Object[]{55, 25}), new CompletableFuture()).get();
System.out.println("55+25=" + result.getResult());

for (int i = 0; i < 100; i++)
client.request(new RpcMessage(DemoService.class.getName(), "sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}));
client.request(new RpcMessage(DemoService.class.getName(), "sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}), new CompletableFuture());

for (int i = 0; i < 100; i++)
client.request(new Main.Data());
client.request(new Main.Data(), new CompletableFuture());

System.out.println("=====test invoke=====");
for (int i = 0; i < 100; i++) {
CompletableFuture<Object> future = client.request(new Main.Data());
CompletableFuture<Object> future = client.request(new Main.Data(), new CompletableFuture());
System.out.println("invoke and get");
System.out.println("invoke result:" + future.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
Expand Down Expand Up @@ -120,7 +121,9 @@ public void testClient() throws Exception {
int index = rd.nextInt(connectionCount);
ExchangeClient client = arrays.get(index);
// ExchangeClient client = arrays.get(0);
String output = (String) client.request(messageBlock).get();
CompletableFuture cf = new CompletableFuture();
client.request(messageBlock, cf);
String output = (String) cf.get();

if (output.lastIndexOf(messageBlock) < 0) {
System.out.println("send messageBlock;get " + output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -72,8 +73,9 @@ public void testClient() throws Throwable {
exchangeClients[i] = Exchangers.connect(url);
}

List<String> serverEnvironment = (List<String>) exchangeClients[0].request("environment").get();
List<String> serverScene = (List<String>) exchangeClients[0].request("scene").get();
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
List<String> serverEnvironment = (List<String>) exchangeClients[0].request("environment", completableFuture).get();
List<String> serverScene = (List<String>) exchangeClients[0].request("scene", completableFuture).get();

// Create some data for test
StringBuilder buf = new StringBuilder(length);
Expand Down Expand Up @@ -101,7 +103,8 @@ public void run() {
count.incrementAndGet();
ExchangeClient client = exchangeClients[index.getAndIncrement() % connections];
long start = System.currentTimeMillis();
String result = (String) client.request(data).get();
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
String result = (String) client.request(data, completableFuture).get();
long end = System.currentTimeMillis();
if (!data.equals(result)) {
throw new IllegalStateException("Invalid result " + result);
Expand Down
Loading