Skip to content

Commit c2785d6

Browse files
committed
[#11497] Fix DirectByteBuffer leak in active thread count
1 parent c540794 commit c2785d6

26 files changed

+411
-456
lines changed

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/ActiveThreadCountStreamSocket.java

+6
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,10 @@ public ClientResponseObserver<PCmdActiveThreadCountRes, Empty> getResponseObserv
100100
return clientResponseObserver;
101101
}
102102

103+
@Override
104+
public String toString() {
105+
return "ActiveThreadCountStreamSocket{" +
106+
"streamObserverId=" + streamObserverId +
107+
'}';
108+
}
103109
}

agent-module/profiler/src/main/java/com/navercorp/pinpoint/profiler/receiver/grpc/GrpcActiveThreadCountService.java

+3
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ public void run() {
108108
PCmdActiveThreadCountRes activeThreadCount = activeThreadCountResponseBuilder.build();
109109

110110
stream.send(activeThreadCount);
111+
if (isDebug) {
112+
LOGGER.debug("ActiveThreadCountStreamSocket. {}", stream);
113+
}
111114
} catch (Throwable e) {
112115
LOGGER.warn("failed to execute ActiveThreadCountTimerTask.run method. streamSocket:{}, message:{}", streamSocket, e.getMessage(), e);
113116
streamSocket.close(e);

collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ public void run() throws Exception {
6464

6565
MetadataService metadataService = new MetadataService(new MockDispatchHandler(), Executors.newFixedThreadPool(8), serverRequestFactory);
6666
List<ServerServiceDefinition> serviceList = List.of(agentService.bindService(), metadataService.bindService());
67+
68+
grpcReceiver.setBindAddress(builder.build());
69+
grpcReceiver.setAddressFilter(new MockAddressFilter());
70+
6771
grpcReceiver.setBindableServiceList(serviceList);
6872
grpcReceiver.setAddressFilter(new MockAddressFilter());
6973
grpcReceiver.setExecutor(Executors.newFixedThreadPool(8));

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/RealtimeCollectorReceiverConfig.java

+8-13
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,17 @@
1515
*/
1616
package com.navercorp.pinpoint.realtime.collector.receiver;
1717

18-
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
19-
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadDumpRes;
20-
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadLightDumpRes;
21-
import com.navercorp.pinpoint.grpc.trace.PCmdEchoResponse;
2218
import com.navercorp.pinpoint.realtime.collector.receiver.grpc.GrpcAgentConnectionRepository;
2319
import com.navercorp.pinpoint.realtime.collector.receiver.grpc.GrpcCommandService;
24-
import com.navercorp.pinpoint.realtime.collector.sink.ErrorSinkRepository;
20+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadCountPublisher;
21+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadDumpPublisher;
22+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadLightDumpPublisher;
23+
import com.navercorp.pinpoint.realtime.collector.sink.EchoPublisher;
2524
import com.navercorp.pinpoint.realtime.collector.sink.RealtimeCollectorSinkConfig;
2625
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
2726
import org.springframework.context.annotation.Bean;
2827
import org.springframework.context.annotation.Configuration;
2928
import org.springframework.context.annotation.Import;
30-
import reactor.core.publisher.FluxSink;
31-
import reactor.core.publisher.MonoSink;
3229

3330
/**
3431
* @author youngjin.kim2
@@ -45,15 +42,13 @@ GrpcAgentConnectionRepository grpcAgentConnectionRepository() {
4542
@Bean("commandService")
4643
GrpcCommandService grpcCommandService(
4744
GrpcAgentConnectionRepository agentConnectionRepository,
48-
ErrorSinkRepository errorSinkRepository,
49-
SinkRepository<FluxSink<PCmdActiveThreadCountRes>> activeThreadCountSinkRepository,
50-
SinkRepository<MonoSink<PCmdActiveThreadDumpRes>> activeThreadDumpSinkRepository,
51-
SinkRepository<MonoSink<PCmdActiveThreadLightDumpRes>> activeThreadLightDumpSinkRepository,
52-
SinkRepository<MonoSink<PCmdEchoResponse>> echoSinkRepository
45+
SinkRepository<ActiveThreadCountPublisher> activeThreadCountSinkRepository,
46+
SinkRepository<ActiveThreadDumpPublisher> activeThreadDumpSinkRepository,
47+
SinkRepository<ActiveThreadLightDumpPublisher> activeThreadLightDumpSinkRepository,
48+
SinkRepository<EchoPublisher> echoSinkRepository
5349
) {
5450
return new GrpcCommandService(
5551
agentConnectionRepository,
56-
errorSinkRepository,
5752
activeThreadCountSinkRepository,
5853
activeThreadDumpSinkRepository,
5954
activeThreadLightDumpSinkRepository,
Original file line numberDiff line numberDiff line change
@@ -16,72 +16,73 @@
1616
package com.navercorp.pinpoint.realtime.collector.receiver.grpc;
1717

1818
import com.google.protobuf.Empty;
19+
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCountRes;
20+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadCountPublisher;
1921
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
2022
import io.grpc.Status;
2123
import io.grpc.StatusException;
2224
import io.grpc.stub.ServerCallStreamObserver;
2325
import io.grpc.stub.StreamObserver;
2426
import org.apache.logging.log4j.LogManager;
2527
import org.apache.logging.log4j.Logger;
26-
import reactor.core.publisher.FluxSink;
2728

2829
import java.util.Objects;
2930

3031
/**
3132
* @author youngjin.kim2
3233
*/
33-
public abstract class FluxCommandResponseObserver<T> implements StreamObserver<T> {
34+
public abstract class ActiveThreadCountResponseStreamObserver implements StreamObserver<PCmdActiveThreadCountRes> {
3435

35-
private static final Logger logger = LogManager.getLogger(FluxCommandResponseObserver.class);
36+
private static final Logger logger = LogManager.getLogger(ActiveThreadCountResponseStreamObserver.class);
3637

37-
private final ServerCallStreamObserver<Empty> connectionObserver;
38-
private final SinkRepository<FluxSink<T>> sinkRepository;
38+
private final ServerCallStreamObserver<Empty> serverCallStreamObserver;
39+
private final SinkRepository<ActiveThreadCountPublisher> sinkRepository;
3940

4041
private volatile long sinkId = -1;
41-
private volatile FluxSink<T> sink = null;
42+
private volatile ActiveThreadCountPublisher publisher = null;
4243

43-
public FluxCommandResponseObserver(
44-
ServerCallStreamObserver<Empty> connectionObserver,
45-
SinkRepository<FluxSink<T>> sinkRepository
46-
) {
47-
this.connectionObserver = Objects.requireNonNull(connectionObserver, "connectionObserver");
44+
public ActiveThreadCountResponseStreamObserver(ServerCallStreamObserver<Empty> serverCallStreamObserver, SinkRepository<ActiveThreadCountPublisher> sinkRepository) {
45+
this.serverCallStreamObserver = Objects.requireNonNull(serverCallStreamObserver, "serverCallStreamObserver");
4846
this.sinkRepository = Objects.requireNonNull(sinkRepository, "sinkRepository");
4947
}
5048

5149
@Override
52-
public void onNext(T response) {
50+
public void onNext(PCmdActiveThreadCountRes response) {
5351
boolean isHello = extractSequence(response) == 1;
5452

5553
if (isHello) {
56-
connectionObserver.onNext(Empty.getDefaultInstance());
54+
serverCallStreamObserver.onNext(Empty.getDefaultInstance());
5755
}
5856

59-
if (ensureSink(response) == null) {
60-
this.connectionObserver.onError(new StatusException(Status.INTERNAL.withDescription("sink not found")));
57+
final ActiveThreadCountPublisher publisher = ensureSink(response);
58+
if (publisher == null) {
59+
this.serverCallStreamObserver.onError(new StatusException(Status.INTERNAL.withDescription("sink not found")));
6160
return;
6261
}
6362

6463
logger.debug("Realtime flux item received: sinkId = {}", sinkId);
6564
if (!isHello) {
66-
this.sink.next(response);
65+
publisher.publish(response);
6766
}
6867
}
6968

70-
private FluxSink<T> ensureSink(T response) {
71-
if (this.sinkId == -1 || this.sink == null) {
69+
private ActiveThreadCountPublisher ensureSink(PCmdActiveThreadCountRes response) {
70+
if (this.sinkId == -1 || publisher == null) {
7271
return initSink(response);
7372
}
74-
return this.sink;
73+
return this.publisher;
7574
}
7675

77-
private FluxSink<T> initSink(T response) {
76+
private ActiveThreadCountPublisher initSink(PCmdActiveThreadCountRes response) {
7877
this.sinkId = this.extractSinkId(response);
79-
this.sink = this.sinkRepository.get(this.sinkId);
80-
if (this.sink == null) {
78+
this.publisher = this.sinkRepository.get(sinkId);
79+
if (this.publisher == null) {
8180
logger.warn("Failed to handle realtime flux item: sink {} not found", this.sinkId);
8281
return null;
82+
} else {
83+
publisher.setStreamObserver(this.serverCallStreamObserver);
8384
}
84-
return this.sink;
85+
return publisher;
8586
}
8687

8788
@Override
@@ -93,10 +94,10 @@ public void onError(Throwable t) {
9394
logger.warn("Stream error: sinkId = {}, {}", sinkId, status);
9495
}
9596

96-
this.connectionObserver.onCompleted();
97+
this.serverCallStreamObserver.onCompleted();
9798

98-
if (this.sink != null) {
99-
this.sink.error(t);
99+
if (this.publisher != null) {
100+
this.publisher.error(t);
100101
} else {
101102
logger.warn("Failed to emit error: sink not found. the error may have occurred before the first message");
102103
}
@@ -106,17 +107,17 @@ public void onError(Throwable t) {
106107
public void onCompleted() {
107108
logger.info("Completed stream: sinkId = {}", this.sinkId);
108109

109-
this.connectionObserver.onCompleted();
110+
this.serverCallStreamObserver.onCompleted();
110111

111-
if (this.sink != null) {
112-
this.sink.complete();
112+
if (this.publisher != null) {
113+
this.publisher.complete();
113114
} else {
114115
logger.warn("Failed to emit complete: sink not found");
115116
}
116117
}
117118

118-
protected abstract long extractSinkId(T response);
119+
protected abstract long extractSinkId(PCmdActiveThreadCountRes response);
119120

120-
protected abstract int extractSequence(T response);
121+
protected abstract int extractSequence(PCmdActiveThreadCountRes response);
121122

122123
}

realtime/realtime-collector/src/main/java/com/navercorp/pinpoint/realtime/collector/receiver/grpc/GrpcCommandService.java

+37-35
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@
2929
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
3030
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
3131
import com.navercorp.pinpoint.grpc.trace.ProfilerCommandServiceGrpc;
32-
import com.navercorp.pinpoint.realtime.collector.sink.ErrorSinkRepository;
32+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadCountPublisher;
33+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadDumpPublisher;
34+
import com.navercorp.pinpoint.realtime.collector.sink.ActiveThreadLightDumpPublisher;
35+
import com.navercorp.pinpoint.realtime.collector.sink.EchoPublisher;
36+
import com.navercorp.pinpoint.realtime.collector.sink.Publisher;
3337
import com.navercorp.pinpoint.realtime.collector.sink.SinkRepository;
3438
import io.grpc.Metadata;
3539
import io.grpc.Status;
@@ -38,8 +42,6 @@
3842
import io.grpc.stub.StreamObserver;
3943
import org.apache.logging.log4j.LogManager;
4044
import org.apache.logging.log4j.Logger;
41-
import reactor.core.publisher.FluxSink;
42-
import reactor.core.publisher.MonoSink;
4345

4446
import java.net.InetSocketAddress;
4547
import java.util.List;
@@ -54,22 +56,19 @@ public class GrpcCommandService extends ProfilerCommandServiceGrpc.ProfilerComma
5456
private final Logger logger = LogManager.getLogger(this.getClass());
5557

5658
private final GrpcAgentConnectionRepository agentConnectionRepository;
57-
private final ErrorSinkRepository sinkRepository;
58-
private final SinkRepository<FluxSink<PCmdActiveThreadCountRes>> activeThreadCountSinkRepo;
59-
private final SinkRepository<MonoSink<PCmdActiveThreadDumpRes>> activeThreadDumpSinkRepo;
60-
private final SinkRepository<MonoSink<PCmdActiveThreadLightDumpRes>> activeThreadLightDumpSinkRepo;
61-
private final SinkRepository<MonoSink<PCmdEchoResponse>> echoSinkRepo;
59+
private final SinkRepository<ActiveThreadCountPublisher> activeThreadCountSinkRepo;
60+
private final SinkRepository<ActiveThreadDumpPublisher> activeThreadDumpSinkRepo;
61+
private final SinkRepository<ActiveThreadLightDumpPublisher> activeThreadLightDumpSinkRepo;
62+
private final SinkRepository<EchoPublisher> echoSinkRepo;
6263

6364
public GrpcCommandService(
6465
GrpcAgentConnectionRepository agentConnectionRepository,
65-
ErrorSinkRepository sinkRepository,
66-
SinkRepository<FluxSink<PCmdActiveThreadCountRes>> activeThreadCountSinkRepo,
67-
SinkRepository<MonoSink<PCmdActiveThreadDumpRes>> activeThreadDumpSinkRepo,
68-
SinkRepository<MonoSink<PCmdActiveThreadLightDumpRes>> activeThreadLightDumpSinkRepo,
69-
SinkRepository<MonoSink<PCmdEchoResponse>> echoSinkRepo
66+
SinkRepository<ActiveThreadCountPublisher> activeThreadCountSinkRepo,
67+
SinkRepository<ActiveThreadDumpPublisher> activeThreadDumpSinkRepo,
68+
SinkRepository<ActiveThreadLightDumpPublisher> activeThreadLightDumpSinkRepo,
69+
SinkRepository<EchoPublisher> echoSinkRepo
7070
) {
7171
this.agentConnectionRepository = Objects.requireNonNull(agentConnectionRepository, "clusterPointRepository");
72-
this.sinkRepository = Objects.requireNonNull(sinkRepository, "sinkRepository");
7372
this.activeThreadCountSinkRepo = Objects.requireNonNull(activeThreadCountSinkRepo, "activeThreadCountSinkRepo");
7473
this.activeThreadDumpSinkRepo = Objects.requireNonNull(activeThreadDumpSinkRepo, "activeThreadDumpSinkRepo");
7574
this.activeThreadLightDumpSinkRepo = Objects.requireNonNull(activeThreadLightDumpSinkRepo, "activeThreadLightDumpSinkRepo");
@@ -127,9 +126,7 @@ public void onNext(PCmdMessage value) {
127126
GrpcCommandService.this.agentConnectionRepository.add(conn);
128127
}
129128
} else if (value.hasFailMessage()) {
130-
PCmdResponse failMessage = value.getFailMessage();
131-
long sinkId = failMessage.getResponseId();
132-
sinkRepository.error(sinkId, new RuntimeException(failMessage.getMessage().getValue()));
129+
handleFail(value);
133130
}
134131
}
135132

@@ -199,9 +196,7 @@ public StreamObserver<PCmdMessage> handleCommandV2(StreamObserver<PCmdRequest> r
199196
@Override
200197
public void onNext(PCmdMessage value) {
201198
if (value.hasFailMessage()) {
202-
PCmdResponse failMessage = value.getFailMessage();
203-
long sinkId = failMessage.getResponseId();
204-
sinkRepository.error(sinkId, new RuntimeException(failMessage.getMessage().getValue()));
199+
handleFail(value);
205200
}
206201
}
207202

@@ -232,6 +227,16 @@ private GrpcAgentConnection buildAgentConnection(
232227
);
233228
}
234229

230+
private void handleFail(PCmdMessage value) {
231+
final PCmdResponse failMessage = value.getFailMessage();
232+
final long sinkId = failMessage.getResponseId();
233+
final Exception exception = new RuntimeException(failMessage.getMessage().getValue());
234+
activeThreadCountSinkRepo.error(sinkId, exception);
235+
activeThreadDumpSinkRepo.error(sinkId, exception);
236+
activeThreadLightDumpSinkRepo.error(sinkId, exception);
237+
echoSinkRepo.error(sinkId, exception);
238+
}
239+
235240
private void handleOnError(Throwable t, GrpcAgentConnection conn) {
236241
if (conn == null) {
237242
logger.warn("Command error before establishment");
@@ -242,7 +247,7 @@ private void handleOnError(Throwable t, GrpcAgentConnection conn) {
242247
Metadata metadata = Status.trailersFromThrowable(t);
243248

244249
logger.info("Failed to command stream, {} => local, {} {}",
245-
conn.getClusterKey(), status, metadata);
250+
conn.getClusterKey(), status, metadata);
246251

247252
}
248253

@@ -258,33 +263,31 @@ private void handleOnCompleted(GrpcAgentConnection conn) {
258263
@Override
259264
public void commandEcho(PCmdEchoResponse response, StreamObserver<Empty> responseObserver) {
260265
long sinkId = response.getCommonResponse().getResponseId();
261-
emitMono(response, responseObserver, this.echoSinkRepo.get(sinkId));
266+
final EchoPublisher publisher = this.echoSinkRepo.get(sinkId);
267+
emitMono(response, responseObserver, publisher);
262268
this.echoSinkRepo.invalidate(sinkId);
263269
}
264270

265271
@Override
266272
public void commandActiveThreadDump(PCmdActiveThreadDumpRes response, StreamObserver<Empty> responseObserver) {
267273
long sinkId = response.getCommonResponse().getResponseId();
268-
emitMono(response, responseObserver, this.activeThreadDumpSinkRepo.get(sinkId));
274+
final ActiveThreadDumpPublisher publisher = this.activeThreadDumpSinkRepo.get(sinkId);
275+
emitMono(response, responseObserver, publisher);
269276
this.activeThreadDumpSinkRepo.invalidate(sinkId);
270277
}
271278

272279
@Override
273-
public void commandActiveThreadLightDump(
274-
PCmdActiveThreadLightDumpRes response,
275-
StreamObserver<Empty> responseObserver
276-
) {
280+
public void commandActiveThreadLightDump(PCmdActiveThreadLightDumpRes response, StreamObserver<Empty> responseObserver) {
277281
long sinkId = response.getCommonResponse().getResponseId();
278-
emitMono(response, responseObserver, this.activeThreadLightDumpSinkRepo.get(sinkId));
282+
final ActiveThreadLightDumpPublisher publisher = this.activeThreadLightDumpSinkRepo.get(sinkId);
283+
emitMono(response, responseObserver, publisher);
279284
this.activeThreadLightDumpSinkRepo.invalidate(sinkId);
280285
}
281286

282287
@Override
283-
public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(
284-
StreamObserver<Empty> responseObserver
285-
) {
288+
public StreamObserver<PCmdActiveThreadCountRes> commandStreamActiveThreadCount(StreamObserver<Empty> responseObserver) {
286289
ServerCallStreamObserver<Empty> serverResponseObserver = (ServerCallStreamObserver<Empty>) responseObserver;
287-
return new FluxCommandResponseObserver<>(serverResponseObserver, this.activeThreadCountSinkRepo) {
290+
return new ActiveThreadCountResponseStreamObserver(serverResponseObserver, this.activeThreadCountSinkRepo) {
288291
@Override
289292
protected long extractSinkId(PCmdActiveThreadCountRes response) {
290293
return response.getCommonStreamResponse().getResponseId();
@@ -297,13 +300,13 @@ protected int extractSequence(PCmdActiveThreadCountRes response) {
297300
};
298301
}
299302

300-
private <T> void emitMono(T response, StreamObserver<Empty> responseObserver, MonoSink<T> sink) {
303+
private <T> void emitMono(T response, StreamObserver<Empty> responseObserver, Publisher<T> sink) {
301304
if (sink == null) {
302305
logger.warn("Could not find echo sink: clusterKey = {}", getClusterKeyFromContext());
303306
responseObserver.onError(new StatusException(Status.NOT_FOUND));
304307
return;
305308
}
306-
sink.success(response);
309+
sink.publish(response);
307310
responseObserver.onNext(Empty.getDefaultInstance());
308311
responseObserver.onCompleted();
309312
}
@@ -356,5 +359,4 @@ public void onCompleted() {
356359
}
357360

358361
}
359-
360362
}

0 commit comments

Comments
 (0)