Skip to content

Commit 26f8c8b

Browse files
committed
[#9932] Replace with Netty4
1 parent 535d046 commit 26f8c8b

File tree

10 files changed

+250
-71
lines changed

10 files changed

+250
-71
lines changed

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/PinpointGrpcServer.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.protobuf.GeneratedMessageV3;
2121
import com.navercorp.pinpoint.collector.cluster.GrpcAgentConnection;
2222
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
23+
import com.navercorp.pinpoint.collector.util.RequestManager;
2324
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
2425
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
2526
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCount;
@@ -30,11 +31,9 @@
3031
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
3132
import com.navercorp.pinpoint.io.ResponseMessage;
3233
import com.navercorp.pinpoint.rpc.PinpointSocketException;
33-
import com.navercorp.pinpoint.rpc.client.RequestManager;
3434
import com.navercorp.pinpoint.rpc.common.SocketState;
3535
import com.navercorp.pinpoint.rpc.common.SocketStateChangeResult;
3636
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
37-
import com.navercorp.pinpoint.rpc.packet.ResponsePacket;
3837
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
3938
import com.navercorp.pinpoint.rpc.packet.stream.StreamResponsePacket;
4039
import com.navercorp.pinpoint.rpc.stream.ClientStreamChannel;
@@ -79,13 +78,13 @@ public class PinpointGrpcServer {
7978

8079
private final InetSocketAddress remoteAddress;
8180
private final ClusterKey clusterKey;
82-
private final RequestManager requestManager;
81+
private final RequestManager<ResponseMessage> requestManager;
8382
private final ProfilerClusterManager profilerClusterManager;
8483
private final StreamObserver<PCmdRequest> requestObserver;
8584

86-
private Runnable onCloseHandler;
85+
private volatile Runnable onCloseHandler;
8786

88-
public PinpointGrpcServer(InetSocketAddress remoteAddress, ClusterKey clusterKey, RequestManager requestManager, ProfilerClusterManager profilerClusterManager, StreamObserver<PCmdRequest> requestObserver) {
87+
public PinpointGrpcServer(InetSocketAddress remoteAddress, ClusterKey clusterKey, RequestManager<ResponseMessage> requestManager, ProfilerClusterManager profilerClusterManager, StreamObserver<PCmdRequest> requestObserver) {
8988
this.remoteAddress = Objects.requireNonNull(remoteAddress, "remoteAddress");
9089
this.clusterKey = Objects.requireNonNull(clusterKey, "clusterKey");
9190
this.requestManager = Objects.requireNonNull(requestManager, "requestManager");
@@ -207,14 +206,20 @@ public void handleMessage(int responseId, GeneratedMessageV3 message) {
207206
if (isInfo) {
208207
logger.info("{} handleMessage:{}", clusterKey, MessageFormatUtils.debugLog(message));
209208
}
210-
TBase<?, ?> tMessage = messageConverter.toMessage(message);
211209

210+
final CompletableFuture<ResponseMessage> responseFuture = requestManager.messageReceived(responseId, clusterKey::format);
211+
if (responseFuture == null) {
212+
logger.warn("Response future is timeout responseId:{}", responseId);
213+
return;
214+
}
212215
try {
216+
TBase<?, ?> tMessage = messageConverter.toMessage(message);
213217
byte[] serialize = SerializationUtils.serialize(tMessage, commandHeaderTBaseSerializerFactory);
214-
ResponsePacket responsePacket = new ResponsePacket(responseId, serialize);
215-
requestManager.messageReceived(responsePacket, clusterKey.format());
218+
219+
ResponseMessage responseMessage = ResponseMessage.wrap(serialize);
220+
responseFuture.complete(responseMessage);
216221
} catch (TException e) {
217-
setFailMessageToFuture(responseId, e.getMessage());
222+
responseFuture.completeExceptionally(new PinpointSocketException(e.getMessage()));
218223
}
219224
}
220225

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/service/command/GrpcCommandService.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
package com.navercorp.pinpoint.collector.receiver.grpc.service.command;
1818

19+
import com.google.protobuf.Empty;
1920
import com.navercorp.pinpoint.collector.cluster.ClusterService;
2021
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
2122
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServer;
2223
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServerRepository;
24+
import com.navercorp.pinpoint.collector.util.RequestManager;
25+
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
2326
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
2427
import com.navercorp.pinpoint.grpc.Header;
2528
import com.navercorp.pinpoint.grpc.StatusError;
@@ -34,17 +37,15 @@
3437
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
3538
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
3639
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
37-
import com.navercorp.pinpoint.rpc.client.RequestManager;
38-
import com.navercorp.pinpoint.rpc.util.TimerFactory;
39-
40-
import com.google.protobuf.Empty;
40+
import com.navercorp.pinpoint.io.ResponseMessage;
4141
import io.grpc.Status;
4242
import io.grpc.StatusException;
4343
import io.grpc.stub.ServerCallStreamObserver;
4444
import io.grpc.stub.StreamObserver;
45-
import org.jboss.netty.util.Timer;
46-
import org.apache.logging.log4j.Logger;
45+
import io.netty.util.HashedWheelTimer;
46+
import io.netty.util.Timer;
4747
import org.apache.logging.log4j.LogManager;
48+
import org.apache.logging.log4j.Logger;
4849

4950
import java.io.Closeable;
5051
import java.io.IOException;
@@ -73,7 +74,13 @@ public class GrpcCommandService extends ProfilerCommandServiceGrpc.ProfilerComma
7374
public GrpcCommandService(ClusterService clusterService) {
7475
Objects.requireNonNull(clusterService, "clusterService");
7576
this.profilerClusterManager = Objects.requireNonNull(clusterService.getProfilerClusterManager(), "profilerClusterManager");
76-
this.timer = TimerFactory.createHashedWheelTimer("GrpcCommandService-Timer", 100, TimeUnit.MILLISECONDS, 512);
77+
this.timer = newTimer();
78+
79+
}
80+
81+
private Timer newTimer() {
82+
final PinpointThreadFactory threadFactory = new PinpointThreadFactory("GrpcCommandService-Timer", true);
83+
return new HashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
7784
}
7885

7986
@Override
@@ -213,7 +220,7 @@ private void unregisterPinpointGrpcServer(Long transportId) {
213220
}
214221

215222
private PinpointGrpcServer createPinpointGrpcServer(StreamObserver<PCmdRequest> requestObserver, ClusterKey clusterKey) {
216-
final RequestManager requestManager = new RequestManager(timer, 3000);
223+
final RequestManager<ResponseMessage> requestManager = new RequestManager<>(timer, 3000);
217224
return new PinpointGrpcServer(getRemoteAddress(), clusterKey, requestManager, profilerClusterManager, requestObserver);
218225
}
219226

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.navercorp.pinpoint.collector.util;
2+
3+
import java.util.Objects;
4+
import java.util.function.BiConsumer;
5+
import java.util.function.IntConsumer;
6+
7+
public class RequestFailListener<RES> implements BiConsumer<RES, Throwable> {
8+
private final int requestId;
9+
private final IntConsumer consumer;
10+
11+
public RequestFailListener(int requestId, IntConsumer consumer) {
12+
this.requestId = requestId;
13+
this.consumer = Objects.requireNonNull(consumer, "consumer");
14+
}
15+
16+
@Override
17+
public void accept(RES res, Throwable throwable) {
18+
if (throwable != null) {
19+
consumer.accept(requestId);
20+
}
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package com.navercorp.pinpoint.collector.util;
2+
3+
4+
import com.navercorp.pinpoint.rpc.PinpointSocketException;
5+
import io.netty.util.Timeout;
6+
import io.netty.util.Timer;
7+
import io.netty.util.TimerTask;
8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
10+
11+
import java.util.Map;
12+
import java.util.Objects;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.ConcurrentMap;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.TimeoutException;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import java.util.function.BiConsumer;
20+
import java.util.function.Supplier;
21+
22+
/**
23+
* @author emeroad
24+
*/
25+
public class RequestManager<RES> {
26+
27+
private final Logger logger = LogManager.getLogger(this.getClass());
28+
29+
private final AtomicInteger requestId = new AtomicInteger(1);
30+
31+
private final ConcurrentMap<Integer, CompletableFuture<RES>> requestMap = new ConcurrentHashMap<>();
32+
// Have to move Timer into factory?
33+
private final Timer timer;
34+
private final long defaultTimeoutMillis;
35+
36+
public RequestManager(Timer timer, long defaultTimeoutMillis) {
37+
this.timer = Objects.requireNonNull(timer, "timer");
38+
39+
if (defaultTimeoutMillis <= 0) {
40+
throw new IllegalArgumentException("defaultTimeoutMillis must greater than zero.");
41+
}
42+
this.defaultTimeoutMillis = defaultTimeoutMillis;
43+
}
44+
45+
private BiConsumer<RES, Throwable> createFailureEventHandler(final int requestId) {
46+
return new RequestFailListener<>(requestId, this::removeMessageFuture);
47+
}
48+
49+
private void addTimeoutTask(CompletableFuture<RES> future, long timeoutMillis) {
50+
Objects.requireNonNull(future, "future");
51+
52+
try {
53+
Timeout timeout = timer.newTimeout(new TimerTask() {
54+
@Override
55+
public void run(Timeout timeout) throws Exception {
56+
if (timeout.isCancelled()) {
57+
return;
58+
}
59+
if (future.isDone()) {
60+
return;
61+
}
62+
future.completeExceptionally(new TimeoutException("Timeout by RequestManager-TIMER"));
63+
}
64+
}, timeoutMillis, TimeUnit.MILLISECONDS);
65+
future.thenAccept(t -> timeout.cancel());
66+
} catch (IllegalStateException e) {
67+
// this case is that timer has been shutdown. That maybe just means that socket has been closed.
68+
future.completeExceptionally(new PinpointSocketException("socket closed")) ;
69+
}
70+
}
71+
72+
public int nextRequestId() {
73+
return this.requestId.getAndIncrement();
74+
}
75+
76+
77+
public CompletableFuture<RES> messageReceived(int responseId, Supplier<Object> sourceName) {
78+
final CompletableFuture<RES> future = removeMessageFuture(responseId);
79+
if (future == null) {
80+
logger.warn("ResponseFuture not found. responseId:{}, sourceName:{}", responseId, sourceName.get());
81+
return null;
82+
}
83+
if (logger.isDebugEnabled()) {
84+
logger.debug("ResponsePacket is arrived responseId:{}, sourceName:{}", responseId, sourceName.get());
85+
}
86+
return future;
87+
}
88+
89+
90+
public CompletableFuture<RES> removeMessageFuture(int requestId) {
91+
return this.requestMap.remove(requestId);
92+
}
93+
94+
public CompletableFuture<RES> register(int requestId) {
95+
return register(requestId, defaultTimeoutMillis);
96+
}
97+
98+
public CompletableFuture<RES> register(int requestId, long timeoutMillis) {
99+
// shutdown check
100+
final CompletableFuture<RES> responseFuture = new CompletableFuture<>();
101+
102+
final CompletableFuture<RES> old = this.requestMap.put(requestId, responseFuture);
103+
if (old != null) {
104+
throw new PinpointSocketException("unexpected error. old future exist:" + old + " id:" + requestId);
105+
}
106+
// when future fails, put a handle in order to remove a failed future in the requestMap.
107+
BiConsumer<RES, Throwable> removeTable = createFailureEventHandler(requestId);
108+
responseFuture.whenComplete(removeTable);
109+
110+
addTimeoutTask(responseFuture, timeoutMillis);
111+
return responseFuture;
112+
}
113+
114+
public void close() {
115+
logger.debug("close()");
116+
final PinpointSocketException closed = new PinpointSocketException("socket closed");
117+
118+
int requestFailCount = 0;
119+
for (Map.Entry<Integer, CompletableFuture<RES>> entry : requestMap.entrySet()) {
120+
if (entry.getValue().completeExceptionally(closed)) {
121+
requestFailCount++;
122+
}
123+
}
124+
this.requestMap.clear();
125+
if (requestFailCount > 0) {
126+
logger.info("Close fail count:{}", requestFailCount);
127+
}
128+
129+
}
130+
131+
}

collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/PinpointGrpcServerTest.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,18 @@
1818

1919
import com.google.protobuf.StringValue;
2020
import com.navercorp.pinpoint.collector.cluster.ProfilerClusterManager;
21+
import com.navercorp.pinpoint.collector.util.RequestManager;
22+
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
2123
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
2224
import com.navercorp.pinpoint.grpc.trace.PCmdEcho;
2325
import com.navercorp.pinpoint.grpc.trace.PCmdEchoResponse;
2426
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
2527
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
2628
import com.navercorp.pinpoint.io.ResponseMessage;
27-
import com.navercorp.pinpoint.rpc.client.RequestManager;
2829
import com.navercorp.pinpoint.rpc.common.SocketStateCode;
29-
import com.navercorp.pinpoint.rpc.util.TimerFactory;
3030
import com.navercorp.pinpoint.thrift.io.TCommandType;
31-
import org.jboss.netty.util.Timer;
31+
import io.netty.util.HashedWheelTimer;
32+
import io.netty.util.Timer;
3233
import org.junit.jupiter.api.AfterAll;
3334
import org.junit.jupiter.api.Assertions;
3435
import org.junit.jupiter.api.BeforeAll;
@@ -39,6 +40,7 @@
3940
import java.util.List;
4041
import java.util.concurrent.CompletableFuture;
4142
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.ThreadFactory;
4244
import java.util.concurrent.TimeUnit;
4345
import java.util.concurrent.TimeoutException;
4446

@@ -55,7 +57,8 @@ public class PinpointGrpcServerTest {
5557

5658
@BeforeAll
5759
public static void setUp() throws Exception {
58-
testTimer = TimerFactory.createHashedWheelTimer(PinpointGrpcServerTest.class + "-Timer", 100, TimeUnit.MILLISECONDS, 512);
60+
ThreadFactory threadFactory = new PinpointThreadFactory(PinpointGrpcServerTest.class + "-Timer", true);
61+
testTimer = new HashedWheelTimer(threadFactory, 100, TimeUnit.MILLISECONDS, 512);
5962
}
6063

6164
@AfterAll
@@ -69,7 +72,8 @@ public static void tearDown() throws Exception {
6972
public void stateTest() {
7073
RecordedStreamObserver recordedStreamObserver = new RecordedStreamObserver();
7174

72-
PinpointGrpcServer pinpointGrpcServer = new PinpointGrpcServer(Mockito.mock(InetSocketAddress.class), clusterKey, new RequestManager(testTimer, 3000), Mockito.mock(ProfilerClusterManager.class), recordedStreamObserver);
75+
RequestManager<ResponseMessage> requestManager = new RequestManager<>(testTimer, 3000);
76+
PinpointGrpcServer pinpointGrpcServer = new PinpointGrpcServer(Mockito.mock(InetSocketAddress.class), clusterKey, requestManager, Mockito.mock(ProfilerClusterManager.class), recordedStreamObserver);
7377
assertCurrentState(SocketStateCode.NONE, pinpointGrpcServer);
7478
CompletableFuture<ResponseMessage> future = pinpointGrpcServer.request(request);
7579
requestOnInvalidState(future, recordedStreamObserver);

commons-profiler/src/main/java/com/navercorp/pinpoint/io/DefaultResponseMessage.java

+7
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,11 @@ public DefaultResponseMessage(byte[] message) {
3232
public byte[] getMessage() {
3333
return message;
3434
}
35+
36+
@Override
37+
public String toString() {
38+
return "DefaultResponseMessage{" +
39+
"message.length=" + message.length +
40+
'}';
41+
}
3542
}

rpc/src/main/java/com/navercorp/pinpoint/rpc/client/DefaultPinpointClientHandler.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public class DefaultPinpointClientHandler extends SimpleChannelHandler implement
9292
private final MessageListener messageListener;
9393
private final ServerStreamChannelMessageHandler serverStreamChannelMessageHandler;
9494

95-
private final RequestManager requestManager;
95+
private final RequestManager<ResponseMessage> requestManager;
9696

9797
private final ChannelFutureListener pingWriteFailFutureListener = new WriteFailFutureListener(this.logger, "ping write fail.", "ping write success.");
9898
private final ChannelFutureListener sendWriteFailFutureListener = new WriteFailFutureListener(this.logger, "send() write fail.", "send() write success.");
@@ -120,7 +120,7 @@ public DefaultPinpointClientHandler(ConnectionFactory connectionFactory, SocketA
120120
this.socketAddressProvider = Objects.requireNonNull(socketAddressProvider, "socketAddressProvider");
121121

122122
this.channelTimer = Objects.requireNonNull(channelTimer, "channelTimer");
123-
this.requestManager = new RequestManager(channelTimer, clientOption.getRequestTimeoutMillis());
123+
this.requestManager = new RequestManager<>(channelTimer, clientOption.getRequestTimeoutMillis());
124124
this.clientOption = Objects.requireNonNull(clientOption, "clientOption");
125125

126126

@@ -372,7 +372,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex
372372
final short packetType = packet.getPacketType();
373373
switch (packetType) {
374374
case PacketType.APPLICATION_RESPONSE:
375-
this.requestManager.messageReceived((ResponsePacket) message, objectUniqName);
375+
ResponsePacket responsePacket = (ResponsePacket) message;
376+
this.requestManager.messageReceived(responsePacket, () -> ResponseMessage.wrap(responsePacket.getPayload()), objectUniqName::toString);
376377
return;
377378
// have to handle a request message through connector
378379
case PacketType.APPLICATION_REQUEST:

0 commit comments

Comments
 (0)