26
26
import com .navercorp .pinpoint .common .util .Assert ;
27
27
import com .navercorp .pinpoint .common .util .StringUtils ;
28
28
import com .navercorp .pinpoint .io .request .Message ;
29
- import com .navercorp .pinpoint .rpc .Future ;
30
29
import com .navercorp .pinpoint .rpc .PinpointSocketException ;
31
30
import com .navercorp .pinpoint .rpc .ResponseMessage ;
32
31
import com .navercorp .pinpoint .rpc .common .SocketStateCode ;
35
34
import com .navercorp .pinpoint .thrift .io .HeaderTBaseDeserializer ;
36
35
import io .grpc .Status ;
37
36
import io .grpc .StatusRuntimeException ;
38
- import org .apache .thrift .TBase ;
39
- import org .apache .logging .log4j .Logger ;
40
37
import org .apache .logging .log4j .LogManager ;
38
+ import org .apache .logging .log4j .Logger ;
39
+ import org .apache .thrift .TBase ;
41
40
import org .springframework .web .bind .annotation .GetMapping ;
42
41
import org .springframework .web .bind .annotation .RequestMapping ;
43
42
import org .springframework .web .bind .annotation .RequestParam ;
47
46
import java .util .ArrayList ;
48
47
import java .util .List ;
49
48
import java .util .Objects ;
49
+ import java .util .concurrent .CompletableFuture ;
50
+ import java .util .concurrent .ExecutionException ;
51
+ import java .util .concurrent .TimeUnit ;
52
+ import java .util .concurrent .TimeoutException ;
50
53
51
54
/**
52
55
* @author Taejin Koo
@@ -158,7 +161,7 @@ private List<GrpcAgentConnection> getGrpcAgentConnectionList(final String applic
158
161
private CheckConnectionStatusResult request (GrpcAgentConnection grpcAgentConnection , int checkCount ) {
159
162
logger .info ("Ping message will be sent. collector => {}." , grpcAgentConnection .getDestClusterKey ());
160
163
161
- Future <ResponseMessage > response = null ;
164
+ CompletableFuture <ResponseMessage > response = null ;
162
165
try {
163
166
response = request0 (grpcAgentConnection , checkCount );
164
167
} catch (StatusRuntimeException e ) {
@@ -173,15 +176,8 @@ private CheckConnectionStatusResult request(GrpcAgentConnection grpcAgentConnect
173
176
clearConnection (grpcAgentConnection );
174
177
return CheckConnectionStatusResult .FAIL_AND_CLEAR_CONNECTION ;
175
178
}
176
-
177
- if (!response .isSuccess ()) {
178
- Throwable cause = response .getCause ();
179
- logger .warn ("Failed while request message. message:{}" , cause .getMessage (), cause );
180
- return CheckConnectionStatusResult .FAIL ;
181
- }
182
-
183
179
try {
184
- ResponseMessage result = response .getResult ( );
180
+ ResponseMessage result = response .get ( 3000 , TimeUnit . MILLISECONDS );
185
181
Message <TBase <?, ?>> deserialize = tBaseDeserializer .deserialize (result .getMessage ());
186
182
187
183
TBase <?, ?> data = deserialize .getData ();
@@ -191,9 +187,11 @@ private CheckConnectionStatusResult request(GrpcAgentConnection grpcAgentConnect
191
187
}
192
188
}
193
189
logger .warn ("Receive unexpected response data. data:{}" , data );
194
- } catch (Exception e ) {
195
- logger .warn ("Exception occurred while handles response message. message:{}" , e .getMessage (), e );
190
+ } catch (Exception cause ) {
191
+ logger .warn ("Failed while request message. message:{}" , cause .getMessage (), cause );
192
+ return CheckConnectionStatusResult .FAIL ;
196
193
}
194
+
197
195
return CheckConnectionStatusResult .FAIL ;
198
196
}
199
197
@@ -204,12 +202,14 @@ private void clearConnection(GrpcAgentConnection grpcAgentConnection) {
204
202
205
203
// If the occur excption in connection, do not retry
206
204
// Multiple attempts only at timeout
207
- private Future <ResponseMessage > request0 (GrpcAgentConnection grpcAgentConnection , int maxCount ) {
205
+ private CompletableFuture <ResponseMessage > request0 (GrpcAgentConnection grpcAgentConnection , int maxCount ) {
208
206
for (int i = 0 ; i < maxCount ; i ++) {
209
- Future <ResponseMessage > responseFuture = grpcAgentConnection .request (CONNECTION_CHECK_COMMAND );
210
- boolean await = responseFuture . await ();
211
- if ( await ) {
207
+ CompletableFuture <ResponseMessage > responseFuture = grpcAgentConnection .request (CONNECTION_CHECK_COMMAND );
208
+ try {
209
+ responseFuture . get ( 3000 , TimeUnit . MILLISECONDS );
212
210
return responseFuture ;
211
+ } catch (InterruptedException | ExecutionException | TimeoutException e ) {
212
+ throw new PinpointSocketException (e );
213
213
}
214
214
}
215
215
0 commit comments