Skip to content

Commit 1523546

Browse files
committed
[pinpoint-apm#9932] Replace DefaultFuture with CompletableFuture
1 parent 79e933a commit 1523546

File tree

6 files changed

+80
-72
lines changed

6 files changed

+80
-72
lines changed

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/PinpointGrpcServer.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
3030
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
3131
import com.navercorp.pinpoint.profiler.context.thrift.CommandGrpcToThriftMessageConverter;
32-
import com.navercorp.pinpoint.rpc.DefaultFuture;
3332
import com.navercorp.pinpoint.rpc.PinpointSocketException;
3433
import com.navercorp.pinpoint.rpc.ResponseMessage;
3534
import com.navercorp.pinpoint.rpc.client.RequestManager;
@@ -44,7 +43,6 @@
4443
import com.navercorp.pinpoint.rpc.stream.StreamChannel;
4544
import com.navercorp.pinpoint.rpc.stream.StreamChannelRepository;
4645
import com.navercorp.pinpoint.rpc.stream.StreamException;
47-
import com.navercorp.pinpoint.rpc.util.Futures;
4846
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;
4947
import com.navercorp.pinpoint.thrift.io.CommandHeaderTBaseSerializerFactory;
5048
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
@@ -145,10 +143,10 @@ public CompletableFuture<ResponseMessage> request(GeneratedMessageV3 message) {
145143
return createFailedFuture(new PinpointSocketException(TRouteResult.NOT_SUPPORTED_REQUEST.name()));
146144
}
147145

148-
DefaultFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
146+
CompletableFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
149147
requestObserver.onNext(request);
150148

151-
return Futures.toCompletableFuture(future);
149+
return future;
152150
}
153151

154152
// 1st message : client(collector) -> server(agent)
@@ -347,9 +345,9 @@ public void close(SocketStateCode toState) {
347345
}
348346

349347
private void setFailMessageToFuture(int responseId, String message) {
350-
DefaultFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
348+
CompletableFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
351349
if (future != null) {
352-
future.setFailure(new PinpointSocketException(message));
350+
future.completeExceptionally(new PinpointSocketException(message));
353351
}
354352
}
355353

rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.navercorp.pinpoint.rpc.client;
1818

19-
import com.navercorp.pinpoint.rpc.ChannelWriteFailListenableFuture;
2019
import com.navercorp.pinpoint.rpc.MessageListener;
2120
import com.navercorp.pinpoint.rpc.PinpointSocket;
2221
import com.navercorp.pinpoint.rpc.PinpointSocketException;
@@ -265,7 +264,7 @@ public void send(byte[] bytes) {
265264
@Override
266265
public CompletableFuture<Void> sendAsync(byte[] bytes) {
267266
ChannelFuture channelFuture = send0(bytes);
268-
return Futures.toCompletableFuture(channelFuture);
267+
return Futures.ioWriteFuture(channelFuture);
269268
}
270269

271270
@Override
@@ -344,11 +343,11 @@ public CompletableFuture<ResponseMessage> request(byte[] bytes) {
344343
}
345344
final int requestId = this.requestManager.nextRequestId();
346345
final RequestPacket request = new RequestPacket(requestId, bytes);
347-
final ChannelWriteFailListenableFuture<ResponseMessage> messageFuture = this.requestManager.register(request.getRequestId(), clientOption.getRequestTimeoutMillis());
348-
write0(request, messageFuture);
346+
final CompletableFuture<ResponseMessage> messageFuture = this.requestManager.register(request.getRequestId(), clientOption.getRequestTimeoutMillis());
349347

350-
return Futures.toCompletableFuture(messageFuture);
348+
write0(request, Futures.writeFailListener(messageFuture));
351349

350+
return messageFuture;
352351
}
353352

354353
@Override

rpc/src/main/java/com/navercorp/pinpoint/rpc/client/RequestManager.java

+38-31
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package com.navercorp.pinpoint.rpc.client;
1818

19-
import com.navercorp.pinpoint.rpc.ChannelWriteFailListenableFuture;
20-
import com.navercorp.pinpoint.rpc.DefaultFuture;
21-
import com.navercorp.pinpoint.rpc.FailureEventHandler;
2219
import com.navercorp.pinpoint.rpc.PinpointSocketException;
2320
import com.navercorp.pinpoint.rpc.ResponseMessage;
2421
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
@@ -29,13 +26,17 @@
2926
import org.jboss.netty.channel.Channel;
3027
import org.jboss.netty.util.Timeout;
3128
import org.jboss.netty.util.Timer;
29+
import org.jboss.netty.util.TimerTask;
3230

3331
import java.util.Map;
3432
import java.util.Objects;
33+
import java.util.concurrent.CompletableFuture;
3534
import java.util.concurrent.ConcurrentHashMap;
3635
import java.util.concurrent.ConcurrentMap;
3736
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.TimeoutException;
3838
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.function.BiConsumer;
3940

4041
/**
4142
* @author emeroad
@@ -46,7 +47,7 @@ public class RequestManager {
4647

4748
private final AtomicInteger requestId = new AtomicInteger(1);
4849

49-
private final ConcurrentMap<Integer, DefaultFuture<ResponseMessage>> requestMap = new ConcurrentHashMap<>();
50+
private final ConcurrentMap<Integer, CompletableFuture<ResponseMessage>> requestMap = new ConcurrentHashMap<>();
5051
// Have to move Timer into factory?
5152
private final Timer timer;
5253
private final long defaultTimeoutMillis;
@@ -60,30 +61,37 @@ public RequestManager(Timer timer, long defaultTimeoutMillis) {
6061
this.defaultTimeoutMillis = defaultTimeoutMillis;
6162
}
6263

63-
private FailureEventHandler createFailureEventHandler(final int requestId) {
64-
FailureEventHandler failureEventHandler = new FailureEventHandler() {
64+
private BiConsumer<ResponseMessage, Throwable> createFailureEventHandler(final int requestId) {
65+
return new BiConsumer<ResponseMessage, Throwable>() {
6566
@Override
66-
public boolean fireFailure() {
67-
DefaultFuture<ResponseMessage> future = removeMessageFuture(requestId);
68-
if (future != null) {
69-
// removed perfectly.
70-
return true;
67+
public void accept(ResponseMessage responseMessage, Throwable throwable) {
68+
if (throwable != null) {
69+
removeMessageFuture(requestId);
7170
}
72-
return false;
7371
}
7472
};
75-
return failureEventHandler;
7673
}
7774

78-
private void addTimeoutTask(DefaultFuture future, long timeoutMillis) {
75+
private <T> void addTimeoutTask(CompletableFuture<T> future, long timeoutMillis) {
7976
Objects.requireNonNull(future, "future");
8077

8178
try {
82-
Timeout timeout = timer.newTimeout(future, timeoutMillis, TimeUnit.MILLISECONDS);
83-
future.setTimeout(timeout);
79+
Timeout timeout = timer.newTimeout(new TimerTask() {
80+
@Override
81+
public void run(Timeout timeout) throws Exception {
82+
if (timeout.isCancelled()) {
83+
return;
84+
}
85+
if (future.isDone()) {
86+
return;
87+
}
88+
future.completeExceptionally(new TimeoutException("Timeout by RequestManager-TIMER"));
89+
}
90+
}, timeoutMillis, TimeUnit.MILLISECONDS);
91+
future.thenAccept(t -> timeout.cancel());
8492
} catch (IllegalStateException e) {
8593
// this case is that timer has been shutdown. That maybe just means that socket has been closed.
86-
future.setFailure(new PinpointSocketException("socket closed")) ;
94+
future.completeExceptionally(new PinpointSocketException("socket closed")) ;
8795
}
8896
}
8997

@@ -93,7 +101,7 @@ public int nextRequestId() {
93101

94102
public void messageReceived(ResponsePacket responsePacket, String objectUniqName) {
95103
final int requestId = responsePacket.getRequestId();
96-
final DefaultFuture<ResponseMessage> future = removeMessageFuture(requestId);
104+
final CompletableFuture<ResponseMessage> future = removeMessageFuture(requestId);
97105
if (future == null) {
98106
logger.warn("future not found:{}, objectUniqName:{}", responsePacket, objectUniqName);
99107
return;
@@ -102,12 +110,12 @@ public void messageReceived(ResponsePacket responsePacket, String objectUniqName
102110
}
103111

104112
ResponseMessage response = ResponseMessage.wrap(responsePacket.getPayload());
105-
future.setResult(response);
113+
future.complete(response);
106114
}
107115

108116
public void messageReceived(ResponsePacket responsePacket, PinpointServer pinpointServer) {
109117
final int requestId = responsePacket.getRequestId();
110-
final DefaultFuture<ResponseMessage> future = removeMessageFuture(requestId);
118+
final CompletableFuture<ResponseMessage> future = removeMessageFuture(requestId);
111119
if (future == null) {
112120
logger.warn("future not found:{}, pinpointServer:{}", responsePacket, pinpointServer);
113121
return;
@@ -116,33 +124,32 @@ public void messageReceived(ResponsePacket responsePacket, PinpointServer pinpoi
116124
}
117125

118126
ResponseMessage response = ResponseMessage.wrap(responsePacket.getPayload());
119-
future.setResult(response);
127+
future.complete(response);
120128
}
121129

122-
public DefaultFuture<ResponseMessage> removeMessageFuture(int requestId) {
130+
public CompletableFuture<ResponseMessage> removeMessageFuture(int requestId) {
123131
return this.requestMap.remove(requestId);
124132
}
125133

126134
public void messageReceived(RequestPacket requestPacket, Channel channel) {
127135
logger.error("unexpectedMessage received:{} address:{}", requestPacket, channel.getRemoteAddress());
128136
}
129137

130-
public ChannelWriteFailListenableFuture<ResponseMessage> register(int requestId) {
138+
public CompletableFuture<ResponseMessage> register(int requestId) {
131139
return register(requestId, defaultTimeoutMillis);
132140
}
133141

134-
public ChannelWriteFailListenableFuture<ResponseMessage> register(int requestId, long timeoutMillis) {
142+
public CompletableFuture<ResponseMessage> register(int requestId, long timeoutMillis) {
135143
// shutdown check
136-
final ChannelWriteFailListenableFuture<ResponseMessage> responseFuture = new ChannelWriteFailListenableFuture<>(timeoutMillis);
144+
final CompletableFuture<ResponseMessage> responseFuture = new CompletableFuture<>();
137145

138-
final DefaultFuture<ResponseMessage> old = this.requestMap.put(requestId, responseFuture);
146+
final CompletableFuture<ResponseMessage> old = this.requestMap.put(requestId, responseFuture);
139147
if (old != null) {
140148
throw new PinpointSocketException("unexpected error. old future exist:" + old + " id:" + requestId);
141149
}
142-
143150
// when future fails, put a handle in order to remove a failed future in the requestMap.
144-
FailureEventHandler removeTable = createFailureEventHandler(requestId);
145-
responseFuture.setFailureEventHandler(removeTable);
151+
BiConsumer<ResponseMessage, Throwable> removeTable = createFailureEventHandler(requestId);
152+
responseFuture.whenComplete(removeTable);
146153

147154
addTimeoutTask(responseFuture, timeoutMillis);
148155
return responseFuture;
@@ -190,8 +197,8 @@ public void close() {
190197
// }
191198
// }
192199
int requestFailCount = 0;
193-
for (Map.Entry<Integer, DefaultFuture<ResponseMessage>> entry : requestMap.entrySet()) {
194-
if (entry.getValue().setFailure(closed)) {
200+
for (Map.Entry<Integer, CompletableFuture<ResponseMessage>> entry : requestMap.entrySet()) {
201+
if (entry.getValue().completeExceptionally(closed)) {
195202
requestFailCount++;
196203
}
197204
}

rpc/src/main/java/com/navercorp/pinpoint/rpc/server/DefaultPinpointServer.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.navercorp.pinpoint.rpc.server;
1818

19-
import com.navercorp.pinpoint.rpc.ChannelWriteFailListenableFuture;
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
2120
import com.navercorp.pinpoint.rpc.client.RequestManager;
2221
import com.navercorp.pinpoint.rpc.client.WriteFailFutureListener;
@@ -202,10 +201,10 @@ public CompletableFuture<ResponseMessage> request(byte[] payload) {
202201

203202
final int requestId = this.requestManager.nextRequestId();
204203
RequestPacket requestPacket = new RequestPacket(requestId, payload);
205-
ChannelWriteFailListenableFuture<ResponseMessage> responseFuture = this.requestManager.register(requestPacket.getRequestId());
206-
write0(requestPacket, responseFuture);
204+
CompletableFuture<ResponseMessage> responseFuture = this.requestManager.register(requestPacket.getRequestId());
205+
write0(requestPacket, Futures.writeFailListener(responseFuture));
207206

208-
return Futures.toCompletableFuture(responseFuture);
207+
return responseFuture;
209208
}
210209

211210
@Override

rpc/src/main/java/com/navercorp/pinpoint/rpc/util/Futures.java

+9-12
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,27 @@
11
package com.navercorp.pinpoint.rpc.util;
22

3-
import com.navercorp.pinpoint.rpc.Future;
4-
import com.navercorp.pinpoint.rpc.FutureListener;
53
import org.jboss.netty.channel.ChannelFuture;
64
import org.jboss.netty.channel.ChannelFutureListener;
75

86
import java.util.concurrent.CompletableFuture;
97

108
public final class Futures {
11-
public static <T> CompletableFuture<T> toCompletableFuture(Future<T> future) {
9+
private Futures() {
10+
}
1211

13-
CompletableFuture<T> completableFuture = new CompletableFuture<>();
14-
future.setListener(new FutureListener<T>() {
12+
public static <T> ChannelFutureListener writeFailListener(CompletableFuture<T> completableFuture) {
13+
return new ChannelFutureListener() {
1514
@Override
16-
public void onComplete(Future<T> future) {
17-
if (future.isSuccess()) {
18-
completableFuture.complete(future.getResult());
19-
} else {
15+
public void operationComplete(ChannelFuture future) throws Exception {
16+
if (!future.isSuccess()) {
2017
completableFuture.completeExceptionally(future.getCause());
2118
}
2219
}
23-
});
24-
return completableFuture;
20+
};
2521
}
2622

27-
public static CompletableFuture<Void> toCompletableFuture(ChannelFuture future) {
23+
24+
public static CompletableFuture<Void> ioWriteFuture(ChannelFuture future) {
2825

2926
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
3027
future.addListener(new ChannelFutureListener() {

rpc/src/test/java/com/navercorp/pinpoint/rpc/client/RequestManagerTest.java

+22-14
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,23 @@
1616

1717
package com.navercorp.pinpoint.rpc.client;
1818

19-
import com.navercorp.pinpoint.rpc.DefaultFuture;
20-
import com.navercorp.pinpoint.rpc.Future;
2119
import com.navercorp.pinpoint.rpc.ResponseMessage;
2220
import org.apache.logging.log4j.LogManager;
2321
import org.apache.logging.log4j.Logger;
2422
import org.jboss.netty.util.HashedWheelTimer;
2523
import org.jboss.netty.util.Timer;
2624
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.Assertions;
2726
import org.junit.jupiter.api.BeforeEach;
2827
import org.junit.jupiter.api.Test;
2928

29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutionException;
3031
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.TimeoutException;
3133

3234
import static org.assertj.core.api.Assertions.assertThat;
33-
import static org.junit.jupiter.api.Assertions.assertFalse;
3435
import static org.junit.jupiter.api.Assertions.assertNull;
35-
import static org.junit.jupiter.api.Assertions.assertTrue;
3636

3737
/**
3838
* @author emeroad
@@ -63,24 +63,32 @@ public void tearDown() throws Exception {
6363
@Test
6464
public void testRegisterRequest() throws Exception {
6565
final int requestId = requestManager.nextRequestId();
66-
final Future<ResponseMessage> future = requestManager.register(requestId, 50);
67-
68-
assertTrue(future.await(200));
69-
assertTrue(future.isReady());
70-
assertFalse(future.isSuccess());
71-
assertThat(future.getCause().getMessage()).contains("timeout");
72-
logger.debug(future.getCause().getMessage());
66+
final CompletableFuture<ResponseMessage> future = requestManager.register(requestId, 50);
67+
68+
try {
69+
future.get(3000, TimeUnit.MILLISECONDS);
70+
Assertions.fail();
71+
} catch (InterruptedException | ExecutionException e) {
72+
Assertions.fail();
73+
} catch (TimeoutException th) {
74+
assertThat(th.getMessage()).contains("timeout");
75+
}
76+
// assertTrue(future.await(200));
77+
// assertTrue(future.isReady());
78+
// assertFalse(future.isSuccess());
79+
// assertThat(future.getCause().getMessage()).contains("timeout");
80+
// logger.debug(future.getCause().getMessage());
7381
}
7482

7583

7684
@Test
7785
public void testRemoveMessageFuture() throws Exception {
7886
int requestId = requestManager.nextRequestId();
7987

80-
DefaultFuture<ResponseMessage> future = requestManager.register(requestId, 2000);
81-
future.setFailure(new RuntimeException());
88+
CompletableFuture<ResponseMessage> future = requestManager.register(requestId, 2000);
89+
future.completeExceptionally(new RuntimeException());
8290

83-
Future<ResponseMessage> nullFuture = requestManager.removeMessageFuture(requestId);
91+
CompletableFuture<ResponseMessage> nullFuture = requestManager.removeMessageFuture(requestId);
8492
assertNull(nullFuture);
8593
}
8694

0 commit comments

Comments
 (0)