Skip to content

Commit 3028cfe

Browse files
committed
[pinpoint-apm#9932] Reduce thrift dependency
1 parent ed4e8a4 commit 3028cfe

32 files changed

+343
-378
lines changed

profiler-test/src/main/java/com/navercorp/pinpoint/test/TestTcpDataSender.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
import com.navercorp.pinpoint.profiler.metadata.SqlMetaData;
2121
import com.navercorp.pinpoint.profiler.metadata.StringMetaData;
2222
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
23-
import com.navercorp.pinpoint.rpc.FutureListener;
2423
import com.navercorp.pinpoint.rpc.ResponseMessage;
25-
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;
2624
import com.navercorp.pinpoint.test.util.BiHashMap;
2725
import com.navercorp.pinpoint.test.util.Pair;
2826

@@ -33,6 +31,8 @@
3331
import java.util.List;
3432
import java.util.Map.Entry;
3533
import java.util.NoSuchElementException;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.function.Consumer;
3636

3737
/**
3838
* @author Jongho Moon
@@ -140,20 +140,11 @@ public boolean request(MetaDataType data, int retry) {
140140
}
141141

142142
@Override
143-
public boolean request(MetaDataType data, FutureListener<ResponseMessage> listener) {
143+
public boolean request(MetaDataType data, Consumer<CompletableFuture<ResponseMessage>> listener) {
144144
addData(data);
145145
return true;
146146
}
147147

148-
@Override
149-
public boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
150-
return false;
151-
}
152-
153-
@Override
154-
public boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
155-
return false;
156-
}
157148

158149
public String getApiDescription(int id) {
159150
return syncGet(apiIdMap, id);

profiler/src/main/java/com/navercorp/pinpoint/profiler/AgentInfoSender.java

+13-23
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,21 @@
1616

1717
package com.navercorp.pinpoint.profiler;
1818

19-
import java.util.Timer;
20-
import java.util.TimerTask;
21-
import java.util.concurrent.atomic.AtomicInteger;
22-
23-
import java.util.Objects;
2419
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
2520
import com.navercorp.pinpoint.profiler.metadata.AgentInfo;
2621
import com.navercorp.pinpoint.profiler.metadata.MetaDataType;
22+
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
2723
import com.navercorp.pinpoint.profiler.sender.ResultResponse;
2824
import com.navercorp.pinpoint.profiler.util.AgentInfoFactory;
29-
import com.navercorp.pinpoint.rpc.DefaultFuture;
3025
import com.navercorp.pinpoint.rpc.ResponseMessage;
31-
import org.apache.logging.log4j.Logger;
3226
import org.apache.logging.log4j.LogManager;
27+
import org.apache.logging.log4j.Logger;
3328

34-
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
29+
import java.util.Objects;
30+
import java.util.Timer;
31+
import java.util.TimerTask;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
3534

3635
/**
3736
* @author emeroad
@@ -175,31 +174,22 @@ public void run() {
175174
private boolean sendAgentInfo() {
176175
try {
177176
AgentInfo agentInfo = agentInfoFactory.createAgentInfo();
178-
final DefaultFuture<ResponseMessage> future = new DefaultFuture<>();
179177

180178
logger.info("Sending AgentInfo {}", agentInfo);
181-
dataSender.request(agentInfo, new ResponseMessageFutureListener(future));
182-
if (!future.await()) {
183-
logger.warn("request timed out while waiting for response.");
184-
return false;
185-
}
186-
if (!future.isSuccess()) {
187-
Throwable t = future.getCause();
188-
logger.warn("request failed.", t);
189-
return false;
190-
}
191-
ResponseMessage responseMessage = future.getResult();
179+
ResponseFutureListener<ResponseMessage> listener = new ResponseFutureListener<>();
180+
dataSender.request(agentInfo, listener);
181+
ResponseMessage responseMessage = listener.getResponseFeture().get(3000, TimeUnit.MILLISECONDS);
192182
if (responseMessage == null) {
193-
logger.warn("result not set.");
183+
logger.warn("result not set");
194184
return false;
195185
}
196186
final ResultResponse result = messageConverter.toMessage(responseMessage);
197187
if (!result.isSuccess()) {
198188
logger.warn("request unsuccessful. Cause : {}", result.getMessage());
199189
}
200190
return result.isSuccess();
201-
} catch (Exception e) {
202-
logger.warn("failed to send agent info.", e);
191+
} catch (Throwable th) {
192+
logger.warn("failed to send agent info", th);
203193
}
204194
return false;
205195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2014 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.profiler;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Consumer;
21+
22+
public class ResponseFutureListener<T> implements Consumer<CompletableFuture<T>> {
23+
24+
private final CompletableFuture<T> future;
25+
26+
public ResponseFutureListener() {
27+
this.future = new CompletableFuture<>();
28+
}
29+
30+
@Override
31+
public void accept(CompletableFuture<T> cf) {
32+
if (!cf.isDone()) {
33+
this.future.completeExceptionally(new IllegalStateException("ResponseMessage future is not complete"));
34+
return;
35+
}
36+
37+
cf.whenComplete((responseMessage, throwable) -> {
38+
if (throwable != null) {
39+
future.completeExceptionally(throwable);
40+
} else {
41+
future.complete(responseMessage);
42+
}
43+
});
44+
}
45+
46+
public CompletableFuture<T> getResponseFeture() {
47+
return future;
48+
}
49+
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/ResponseMessageFutureListener.java

-51
This file was deleted.

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/EmptyDataSender.java

+4-13
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package com.navercorp.pinpoint.profiler.sender;
1818

19-
import com.navercorp.pinpoint.rpc.FutureListener;
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
21-
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
2223

2324

2425
/**
@@ -55,17 +56,7 @@ public boolean request(T data, int retry) {
5556

5657

5758
@Override
58-
public boolean request(T data, FutureListener<ResponseMessage> listener) {
59-
return false;
60-
}
61-
62-
@Override
63-
public boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
64-
return false;
65-
}
66-
67-
@Override
68-
public boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
59+
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
6960
return false;
7061
}
7162

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/EnhancedDataSender.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package com.navercorp.pinpoint.profiler.sender;
1818

19-
import com.navercorp.pinpoint.rpc.FutureListener;
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
21-
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;
20+
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
2223

2324

2425
/**
@@ -27,10 +28,10 @@
2728
public interface EnhancedDataSender<T> extends DataSender<T> {
2829

2930
boolean request(T data);
31+
3032
boolean request(T data, int retry);
31-
boolean request(T data, FutureListener<ResponseMessage> listener);
3233

33-
boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener);
34-
boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener);
34+
boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener);
35+
3536

3637
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/ListenerableRequestMessage.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616

1717
package com.navercorp.pinpoint.profiler.sender;
1818

19-
import com.navercorp.pinpoint.rpc.FutureListener;
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
2120

21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
23+
2224
/**
2325
* @author Woonduk Kang(emeroad)
2426
*/
2527
class ListenerableRequestMessage<T> implements RequestMessage<T> {
2628
private final T message;
27-
private final FutureListener<ResponseMessage> futureListener;
29+
private final Consumer<CompletableFuture<ResponseMessage>> futureListener;
2830

2931

30-
ListenerableRequestMessage(T message, FutureListener<ResponseMessage> futureListener) {
32+
ListenerableRequestMessage(T message, Consumer<CompletableFuture<ResponseMessage>> futureListener) {
3133
this.message = message;
3234
this.futureListener = futureListener;
3335
}
@@ -44,7 +46,7 @@ public int getRetryCount() {
4446
}
4547

4648
@Override
47-
public FutureListener<ResponseMessage> getFutureListener() {
49+
public Consumer<CompletableFuture<ResponseMessage>> getFutureListener() {
4850
return futureListener;
4951
}
5052
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/LoggingDataSender.java

+5-17
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package com.navercorp.pinpoint.profiler.sender;
1818

19-
import com.navercorp.pinpoint.rpc.FutureListener;
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
21-
import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener;
22-
23-
import org.apache.logging.log4j.Logger;
2420
import org.apache.logging.log4j.LogManager;
21+
import org.apache.logging.log4j.Logger;
22+
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.function.Consumer;
2525

2626

2727
/**
@@ -58,21 +58,9 @@ public boolean request(T data, int retry) {
5858

5959

6060
@Override
61-
public boolean request(T data, FutureListener<ResponseMessage> listener) {
61+
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
6262
logger.info("request tBase:{} FutureListener:{}", data, listener);
6363
return false;
6464
}
6565

66-
@Override
67-
public boolean addReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
68-
logger.info("addReconnectEventListener eventListener:{}", eventListener);
69-
return false;
70-
}
71-
72-
@Override
73-
public boolean removeReconnectEventListener(PinpointClientReconnectEventListener eventListener) {
74-
logger.info("removeReconnectEventListener eventListener:{}", eventListener);
75-
return false;
76-
}
77-
7866
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.navercorp.pinpoint.profiler.sender;
2+
3+
import java.util.function.Consumer;
4+
5+
public interface ReconnectEventListenerRegistry<T> {
6+
boolean addEventListener(Consumer<T> eventListener);
7+
8+
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RequestMessage.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package com.navercorp.pinpoint.profiler.sender;
1818

19-
import com.navercorp.pinpoint.rpc.FutureListener;
2019
import com.navercorp.pinpoint.rpc.ResponseMessage;
2120

21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Consumer;
23+
2224
/**
2325
* @author Woonduk Kang(emeroad)
2426
*/
@@ -28,6 +30,6 @@ public interface RequestMessage<M> {
2830

2931
int getRetryCount();
3032

31-
FutureListener<ResponseMessage> getFutureListener();
33+
Consumer<CompletableFuture<ResponseMessage>> getFutureListener();
3234

3335
}

profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/RequestMessageFactory.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717
package com.navercorp.pinpoint.profiler.sender;
1818

1919

20-
import com.navercorp.pinpoint.rpc.FutureListener;
2120
import com.navercorp.pinpoint.rpc.ResponseMessage;
2221

22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.function.Consumer;
24+
2325
/**
2426
* @author Woonduk Kang(emeroad)
2527
*/
@@ -32,7 +34,7 @@ public static <T> RequestMessage<T> request(T message, int retryCount) {
3234
return new RetryRequestMessage<>(message, retryCount);
3335
}
3436

35-
public static <T> RequestMessage<T> request(T message, FutureListener<ResponseMessage> futureListener) {
37+
public static <T> RequestMessage<T> request(T message, Consumer<CompletableFuture<ResponseMessage>> futureListener) {
3638
return new ListenerableRequestMessage<>(message, futureListener);
3739
}
3840

0 commit comments

Comments
 (0)