Skip to content

Commit c3cf897

Browse files
committed
[pinpoint-apm#9932] Replace DefaultFuture with CompletableFuture
1 parent 97e287f commit c3cf897

File tree

22 files changed

+157
-207
lines changed

22 files changed

+157
-207
lines changed

collector/src/main/java/com/navercorp/pinpoint/collector/controller/ClusterPointController.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,12 @@ private CompletableFuture<ResponseMessage> request0(GrpcAgentConnection grpcAgen
208208
try {
209209
responseFuture.get(3000, TimeUnit.MILLISECONDS);
210210
return responseFuture;
211-
} catch (InterruptedException | ExecutionException | TimeoutException e) {
211+
} catch (InterruptedException e) {
212+
Thread.currentThread().interrupt();
213+
throw new PinpointSocketException(e);
214+
} catch (ExecutionException e) {
215+
throw new PinpointSocketException(e.getCause());
216+
} catch (TimeoutException e) {
212217
throw new PinpointSocketException(e);
213218
}
214219
}

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/PinpointGrpcServer.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
3030
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
3131
import com.navercorp.pinpoint.profiler.context.thrift.CommandGrpcToThriftMessageConverter;
32-
import com.navercorp.pinpoint.rpc.DefaultFuture;
3332
import com.navercorp.pinpoint.rpc.PinpointSocketException;
3433
import com.navercorp.pinpoint.rpc.ResponseMessage;
3534
import com.navercorp.pinpoint.rpc.client.RequestManager;
@@ -44,7 +43,6 @@
4443
import com.navercorp.pinpoint.rpc.stream.StreamChannel;
4544
import com.navercorp.pinpoint.rpc.stream.StreamChannelRepository;
4645
import com.navercorp.pinpoint.rpc.stream.StreamException;
47-
import com.navercorp.pinpoint.rpc.util.Futures;
4846
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;
4947
import com.navercorp.pinpoint.thrift.io.CommandHeaderTBaseSerializerFactory;
5048
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
@@ -145,10 +143,9 @@ public CompletableFuture<ResponseMessage> request(GeneratedMessageV3 message) {
145143
return createFailedFuture(new PinpointSocketException(TRouteResult.NOT_SUPPORTED_REQUEST.name()));
146144
}
147145

148-
DefaultFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
146+
CompletableFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
149147
requestObserver.onNext(request);
150-
151-
return Futures.toCompletableFuture(future);
148+
return future;
152149
}
153150

154151
// 1st message : client(collector) -> server(agent)
@@ -347,9 +344,9 @@ public void close(SocketStateCode toState) {
347344
}
348345

349346
private void setFailMessageToFuture(int responseId, String message) {
350-
DefaultFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
347+
CompletableFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
351348
if (future != null) {
352-
future.setFailure(new PinpointSocketException(message));
349+
future.completeExceptionally(new PinpointSocketException(message));
353350
}
354351
}
355352

profiler-test/src/main/java/com/navercorp/pinpoint/test/TestTcpDataSender.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
import java.util.List;
3232
import java.util.Map.Entry;
3333
import java.util.NoSuchElementException;
34-
import java.util.concurrent.CompletableFuture;
35-
import java.util.function.Consumer;
34+
import java.util.function.BiConsumer;
3635

3736
/**
3837
* @author Jongho Moon
@@ -140,7 +139,7 @@ public boolean request(MetaDataType data, int retry) {
140139
}
141140

142141
@Override
143-
public boolean request(MetaDataType data, Consumer<CompletableFuture<ResponseMessage>> listener) {
142+
public boolean request(MetaDataType data, BiConsumer<ResponseMessage, Throwable> listener) {
144143
addData(data);
145144
return true;
146145
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/AgentInfoSender.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import java.util.Objects;
3030
import java.util.Timer;
3131
import java.util.TimerTask;
32+
import java.util.concurrent.ExecutionException;
3233
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.TimeoutException;
3335
import java.util.concurrent.atomic.AtomicInteger;
3436

3537
/**
@@ -176,9 +178,9 @@ private boolean sendAgentInfo() {
176178
AgentInfo agentInfo = agentInfoFactory.createAgentInfo();
177179

178180
logger.info("Sending AgentInfo {}", agentInfo);
179-
ResponseFutureListener<ResponseMessage> listener = new ResponseFutureListener<>();
181+
ResponseFutureListener<ResponseMessage, Throwable> listener = new ResponseFutureListener<>();
180182
dataSender.request(agentInfo, listener);
181-
ResponseMessage responseMessage = listener.getResponseFeture().get(3000, TimeUnit.MILLISECONDS);
183+
ResponseMessage responseMessage = listener.getResponseFuture().get(3000, TimeUnit.MILLISECONDS);
182184
if (responseMessage == null) {
183185
logger.warn("result not set");
184186
return false;
@@ -188,11 +190,20 @@ private boolean sendAgentInfo() {
188190
logger.warn("request unsuccessful. Cause : {}", result.getMessage());
189191
}
190192
return result.isSuccess();
191-
} catch (Throwable th) {
192-
logger.warn("failed to send agent info", th);
193+
} catch (ExecutionException ex) {
194+
logError(ex.getCause());
195+
} catch (InterruptedException ex) {
196+
Thread.currentThread().interrupt();
197+
logError(ex);
198+
} catch (TimeoutException ex) {
199+
logError(ex);
193200
}
194201
return false;
195202
}
203+
204+
private void logError(Throwable cause) {
205+
logger.warn("failed to send agent info", cause);
206+
}
196207
}
197208

198209
public static class Builder {

profiler/src/main/java/com/navercorp/pinpoint/profiler/ResponseFutureListener.java

+8-15
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package com.navercorp.pinpoint.profiler;
1818

1919
import java.util.concurrent.CompletableFuture;
20-
import java.util.function.Consumer;
20+
import java.util.function.BiConsumer;
2121

22-
public class ResponseFutureListener<T> implements Consumer<CompletableFuture<T>> {
22+
public class ResponseFutureListener<T, U extends Throwable> implements BiConsumer<T, U> {
2323

2424
private final CompletableFuture<T> future;
2525

@@ -28,22 +28,15 @@ public ResponseFutureListener() {
2828
}
2929

3030
@Override
31-
public void accept(CompletableFuture<T> cf) {
32-
if (!cf.isDone()) {
33-
this.future.completeExceptionally(new IllegalStateException("ResponseMessage future is not complete"));
34-
return;
31+
public void accept(T message, U th) {
32+
if (message != null) {
33+
future.complete(message);
34+
} else {
35+
future.completeExceptionally(th);
3536
}
36-
37-
cf.whenComplete((responseMessage, throwable) -> {
38-
if (throwable != null) {
39-
future.completeExceptionally(throwable);
40-
} else {
41-
future.complete(responseMessage);
42-
}
43-
});
4437
}
4538

46-
public CompletableFuture<T> getResponseFeture() {
39+
public CompletableFuture<T> getResponseFuture() {
4740
return future;
4841
}
4942
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/EmptyDataSender.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import com.navercorp.pinpoint.rpc.ResponseMessage;
2020

21-
import java.util.concurrent.CompletableFuture;
22-
import java.util.function.Consumer;
21+
import java.util.function.BiConsumer;
2322

2423

2524
/**
@@ -56,7 +55,7 @@ public boolean request(T data, int retry) {
5655

5756

5857
@Override
59-
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
58+
public boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener) {
6059
return false;
6160
}
6261

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/EnhancedDataSender.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import com.navercorp.pinpoint.rpc.ResponseMessage;
2020

21-
import java.util.concurrent.CompletableFuture;
22-
import java.util.function.Consumer;
21+
import java.util.function.BiConsumer;
2322

2423

2524
/**
@@ -31,7 +30,7 @@ public interface EnhancedDataSender<T> extends DataSender<T> {
3130

3231
boolean request(T data, int retry);
3332

34-
boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener);
33+
boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener);
3534

3635

3736
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/ListenerableRequestMessage.java

-52
This file was deleted.

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/LoggingDataSender.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
import org.apache.logging.log4j.LogManager;
2121
import org.apache.logging.log4j.Logger;
2222

23-
import java.util.concurrent.CompletableFuture;
24-
import java.util.function.Consumer;
23+
import java.util.function.BiConsumer;
2524

2625

2726
/**
@@ -58,7 +57,7 @@ public boolean request(T data, int retry) {
5857

5958

6059
@Override
61-
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
60+
public boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener) {
6261
logger.info("request tBase:{} FutureListener:{}", data, listener);
6362
return false;
6463
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RequestMessage.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import com.navercorp.pinpoint.rpc.ResponseMessage;
2020

21-
import java.util.concurrent.CompletableFuture;
22-
import java.util.function.Consumer;
21+
import java.util.function.BiConsumer;
2322

2423
/**
2524
* @author Woonduk Kang(emeroad)
@@ -30,6 +29,6 @@ public interface RequestMessage<M> {
3029

3130
int getRetryCount();
3231

33-
Consumer<CompletableFuture<ResponseMessage>> getFutureListener();
32+
BiConsumer<ResponseMessage, Throwable> getFutureListener();
3433

3534
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RequestMessageFactory.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919

2020
import com.navercorp.pinpoint.rpc.ResponseMessage;
2121

22-
import java.util.concurrent.CompletableFuture;
23-
import java.util.function.Consumer;
22+
import java.util.function.BiConsumer;
2423

2524
/**
2625
* @author Woonduk Kang(emeroad)
@@ -31,11 +30,11 @@ private RequestMessageFactory() {
3130
}
3231

3332
public static <T> RequestMessage<T> request(T message, int retryCount) {
34-
return new RetryRequestMessage<>(message, retryCount);
33+
return new RetryRequestMessage<>(message, retryCount, null);
3534
}
3635

37-
public static <T> RequestMessage<T> request(T message, Consumer<CompletableFuture<ResponseMessage>> futureListener) {
38-
return new ListenerableRequestMessage<>(message, futureListener);
36+
public static <T> RequestMessage<T> request(T message, int retryCount, BiConsumer<ResponseMessage, Throwable> futureListener) {
37+
return new RetryRequestMessage<>(message, retryCount, futureListener);
3938
}
4039

4140
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RetryRequestMessage.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,23 @@
1616

1717
package com.navercorp.pinpoint.profiler.sender;
1818

19-
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
2120

22-
import java.util.concurrent.CompletableFuture;
23-
import java.util.function.Consumer;
21+
import java.util.function.BiConsumer;
2422

2523
/**
2624
* @author Woonduk Kang(emeroad)
2725
*/
28-
public class RetryRequestMessage<T> implements RequestMessage<T> {
26+
class RetryRequestMessage<T> implements RequestMessage<T> {
2927
private final T message;
3028
private final int retryCount;
29+
private final BiConsumer<ResponseMessage, Throwable> futureListener;
3130

3231

33-
RetryRequestMessage(T message, int retryCount) {
32+
RetryRequestMessage(T message, int retryCount, BiConsumer<ResponseMessage, Throwable> futureListener) {
3433
this.message = message;
3534
this.retryCount = retryCount;
35+
this.futureListener = futureListener;
3636
}
3737

3838

@@ -47,7 +47,7 @@ public int getRetryCount() {
4747
}
4848

4949
@Override
50-
public Consumer<CompletableFuture<ResponseMessage>> getFutureListener() {
51-
return null;
50+
public BiConsumer<ResponseMessage, Throwable> getFutureListener() {
51+
return futureListener;
5252
}
5353
}

0 commit comments

Comments
 (0)