16
16
17
17
package com .navercorp .pinpoint .web .websocket ;
18
18
19
- import com .navercorp .pinpoint .common .util .CpuUtils ;
20
19
import com .navercorp .pinpoint .common .profiler .concurrent .PinpointThreadFactory ;
20
+ import com .navercorp .pinpoint .common .util .CpuUtils ;
21
21
import com .navercorp .pinpoint .rpc .util .ClassUtils ;
22
22
import com .navercorp .pinpoint .rpc .util .MapUtils ;
23
23
import com .navercorp .pinpoint .web .security .ServerMapDataFilter ;
28
28
import com .navercorp .pinpoint .web .websocket .message .PinpointWebSocketMessage ;
29
29
import com .navercorp .pinpoint .web .websocket .message .PinpointWebSocketMessageConverter ;
30
30
import com .navercorp .pinpoint .web .websocket .message .PinpointWebSocketMessageType ;
31
- import com .navercorp .pinpoint .web .websocket .message .PongMessage ;
32
31
import com .navercorp .pinpoint .web .websocket .message .RequestMessage ;
33
32
import org .apache .commons .lang3 .StringUtils ;
34
- import org .apache .logging .log4j .Logger ;
35
33
import org .apache .logging .log4j .LogManager ;
34
+ import org .apache .logging .log4j .Logger ;
36
35
import org .springframework .beans .factory .annotation .Autowired ;
36
+ import org .springframework .lang .NonNull ;
37
37
import org .springframework .web .socket .CloseStatus ;
38
38
import org .springframework .web .socket .TextMessage ;
39
39
import org .springframework .web .socket .WebSocketSession ;
@@ -84,7 +84,7 @@ public class ActiveThreadCountHandler extends TextWebSocketHandler implements Pi
84
84
85
85
private Timer reactiveTimer ;
86
86
87
- @ Autowired (required = false )
87
+ @ Autowired (required = false )
88
88
ServerMapDataFilter serverMapDataFilter ;
89
89
90
90
@ Autowired (required = false )
@@ -154,6 +154,11 @@ public String getRequestMapping() {
154
154
return requestMapping ;
155
155
}
156
156
157
+ @ Override
158
+ public int getPriority () {
159
+ return 0 ;
160
+ }
161
+
157
162
private WebSocketSessionContext getSessionContext (WebSocketSession webSocketSession ) {
158
163
final WebSocketSessionContext sessionContext = WebSocketSessionContext .getSessionContext (webSocketSession );
159
164
if (sessionContext == null ) {
@@ -163,7 +168,7 @@ private WebSocketSessionContext getSessionContext(WebSocketSession webSocketSess
163
168
}
164
169
165
170
@ Override
166
- public void afterConnectionEstablished (WebSocketSession newSession ) throws Exception {
171
+ public void afterConnectionEstablished (@ NonNull WebSocketSession newSession ) throws Exception {
167
172
logger .info ("ConnectionEstablished. session:{}" , newSession );
168
173
169
174
synchronized (lock ) {
@@ -179,7 +184,7 @@ public void afterConnectionEstablished(WebSocketSession newSession) throws Excep
179
184
}
180
185
181
186
@ Override
182
- public void afterConnectionClosed (WebSocketSession closeSession , CloseStatus status ) throws Exception {
187
+ public void afterConnectionClosed (@ NonNull WebSocketSession closeSession , @ NonNull CloseStatus status ) throws Exception {
183
188
logger .info ("ConnectionClose. session:{}, caused:{}" , closeSession , status );
184
189
185
190
final WebSocketSessionContext sessionContext = getSessionContext (closeSession );
@@ -196,7 +201,7 @@ public void afterConnectionClosed(WebSocketSession closeSession, CloseStatus sta
196
201
}
197
202
198
203
@ Override
199
- protected void handleTextMessage (WebSocketSession webSocketSession , TextMessage message ) throws Exception {
204
+ protected void handleTextMessage (@ NonNull WebSocketSession webSocketSession , TextMessage message ) throws Exception {
200
205
logger .info ("handleTextMessage. session:{}, remote:{}, message:{}." , webSocketSession , webSocketSession .getRemoteAddress (), message .getPayload ());
201
206
202
207
PinpointWebSocketMessage webSocketMessage = messageConverter .getWebSocketMessage (message .getPayload ());
@@ -206,7 +211,7 @@ protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage
206
211
handleRequestMessage0 (webSocketSession , (RequestMessage ) webSocketMessage );
207
212
break ;
208
213
case PONG :
209
- handlePongMessage0 (webSocketSession , ( PongMessage ) webSocketMessage );
214
+ handlePongMessage0 (webSocketSession );
210
215
break ;
211
216
default :
212
217
logger .warn ("Unexpected WebSocketMessageType received. messageType:{}." , webSocketMessageType );
@@ -233,18 +238,22 @@ private void handleRequestMessage0(WebSocketSession webSocketSession, RequestMes
233
238
private void handleActiveThreadCount (WebSocketSession webSocketSession , RequestMessage requestMessage ) {
234
239
final String applicationName = MapUtils .getString (requestMessage .getParameters (), APPLICATION_NAME_KEY );
235
240
if (applicationName != null ) {
236
- final WebSocketSessionContext sessionContext = getSessionContext (webSocketSession );
237
- synchronized (lock ) {
238
- if (StringUtils .equals (applicationName , sessionContext .getApplicationName ())) {
239
- return ;
240
- }
241
+ handleActiveThreadCount (webSocketSession , applicationName );
242
+ }
243
+ }
241
244
242
- unbindingResponseAggregator (webSocketSession , sessionContext );
243
- if (webSocketSession .isOpen ()) {
244
- bindingResponseAggregator (webSocketSession , sessionContext , applicationName );
245
- } else {
246
- logger .warn ("WebSocketSession is not opened. skip binding." );
247
- }
245
+ protected void handleActiveThreadCount (WebSocketSession webSocketSession , String applicationName ) {
246
+ final WebSocketSessionContext sessionContext = getSessionContext (webSocketSession );
247
+ synchronized (lock ) {
248
+ if (StringUtils .equals (applicationName , sessionContext .getApplicationName ())) {
249
+ return ;
250
+ }
251
+
252
+ unbindingResponseAggregator (webSocketSession , sessionContext );
253
+ if (webSocketSession .isOpen ()) {
254
+ bindingResponseAggregator (webSocketSession , sessionContext , applicationName );
255
+ } else {
256
+ logger .warn ("WebSocketSession is not opened. skip binding." );
248
257
}
249
258
}
250
259
}
@@ -257,13 +266,13 @@ private void closeSession(WebSocketSession session, CloseStatus status) {
257
266
}
258
267
}
259
268
260
- private void handlePongMessage0 (WebSocketSession webSocketSession , PongMessage pongMessage ) {
269
+ private void handlePongMessage0 (WebSocketSession webSocketSession ) {
261
270
final WebSocketSessionContext sessionContext = getSessionContext (webSocketSession );
262
271
sessionContext .changeHealthCheckSuccess ();
263
272
}
264
273
265
274
@ Override
266
- protected void handlePongMessage (WebSocketSession webSocketSession , org .springframework .web .socket .PongMessage message ) throws Exception {
275
+ protected void handlePongMessage (@ NonNull WebSocketSession webSocketSession , org .springframework .web .socket .PongMessage message ) throws Exception {
267
276
logger .info ("handlePongMessage. session:{}, remote:{}, message:{}." , webSocketSession , webSocketSession .getRemoteAddress (), message .getPayload ());
268
277
269
278
super .handlePongMessage (webSocketSession , message );
0 commit comments