Skip to content

Commit 1278324

Browse files
committed
[pinpoint-apm#9932] Removed thrift dependency of Agent module
1 parent 455c291 commit 1278324

File tree

39 files changed

+2579
-22
lines changed

39 files changed

+2579
-22
lines changed

bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/config/DefaultProfilerConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class DefaultProfilerConfig implements ProfilerConfig {
4747

4848
private final Properties properties;
4949

50-
private static final TransportModule DEFAULT_TRANSPORT_MODULE = TransportModule.THRIFT;
50+
private static final TransportModule DEFAULT_TRANSPORT_MODULE = TransportModule.GRPC;
5151

5252
@Value("${profiler.enable:true}")
5353
private boolean profileEnable = false;

bootstraps/bootstrap-core/src/main/java/com/navercorp/pinpoint/bootstrap/config/TransportModule.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,14 @@
2222
* @author Woonduk Kang(emeroad)
2323
*/
2424
public enum TransportModule {
25-
THRIFT,
2625
GRPC;
2726

2827
public static TransportModule parse(String transportModule) {
2928
Objects.requireNonNull(transportModule, "transportModule");
30-
31-
if (isEquals(THRIFT, transportModule)) {
32-
return THRIFT;
33-
}
3429
if (isEquals(GRPC, transportModule)) {
3530
return GRPC;
3631
}
37-
return null;
32+
return GRPC;
3833
}
3934

4035
private static boolean isEquals(TransportModule transportModule, String transportModuleString) {

collector/src/main/java/com/navercorp/pinpoint/collector/cluster/GrpcAgentConnection.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package com.navercorp.pinpoint.collector.cluster;
1818

1919
import com.google.protobuf.GeneratedMessageV3;
20+
import com.navercorp.pinpoint.collector.cluster.grpc.CommandThriftToGrpcMessageConverter;
2021
import com.navercorp.pinpoint.collector.receiver.grpc.PinpointGrpcServer;
2122
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
22-
import com.navercorp.pinpoint.profiler.context.grpc.CommandThriftToGrpcMessageConverter;
2323
import com.navercorp.pinpoint.rpc.PinpointSocketException;
2424
import com.navercorp.pinpoint.rpc.ResponseMessage;
2525
import com.navercorp.pinpoint.rpc.packet.stream.StreamCode;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2019 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.collector.cluster.grpc;
18+
19+
import com.google.protobuf.GeneratedMessageV3;
20+
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
21+
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadCount;
22+
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadDump;
23+
import com.navercorp.pinpoint.grpc.trace.PCmdActiveThreadLightDump;
24+
import com.navercorp.pinpoint.grpc.trace.PCmdEcho;
25+
import com.navercorp.pinpoint.thrift.dto.command.TCmdActiveThreadCount;
26+
import com.navercorp.pinpoint.thrift.dto.command.TCmdActiveThreadDump;
27+
import com.navercorp.pinpoint.thrift.dto.command.TCmdActiveThreadLightDump;
28+
import com.navercorp.pinpoint.thrift.dto.command.TCommandEcho;
29+
30+
/**
31+
* @author Taejin Koo
32+
*/
33+
public class CommandThriftToGrpcMessageConverter implements MessageConverter<Object, GeneratedMessageV3> {
34+
35+
@Override
36+
public GeneratedMessageV3 toMessage(Object message) {
37+
if (message instanceof TCommandEcho) {
38+
return buildPCommandEcho((TCommandEcho) message);
39+
} else if (message instanceof TCmdActiveThreadCount) {
40+
return buildPCmdActiveThreadCount((TCmdActiveThreadCount) message);
41+
} else if (message instanceof TCmdActiveThreadDump) {
42+
return buildPCmdActiveThreadDump((TCmdActiveThreadDump) message);
43+
} else if (message instanceof TCmdActiveThreadLightDump) {
44+
return buildPCmdActiveThreadLightDump((TCmdActiveThreadLightDump) message);
45+
}
46+
return null;
47+
}
48+
49+
private PCmdEcho buildPCommandEcho(TCommandEcho tCommandEcho) {
50+
PCmdEcho.Builder builder = PCmdEcho.newBuilder();
51+
builder.setMessage(tCommandEcho.getMessage());
52+
return builder.build();
53+
}
54+
55+
private PCmdActiveThreadCount buildPCmdActiveThreadCount(TCmdActiveThreadCount tCmdActiveThreadCount) {
56+
PCmdActiveThreadCount.Builder builder = PCmdActiveThreadCount.newBuilder();
57+
return builder.build();
58+
}
59+
60+
private PCmdActiveThreadDump buildPCmdActiveThreadDump(TCmdActiveThreadDump tCmdActiveThreadDump) {
61+
PCmdActiveThreadDump.Builder builder = PCmdActiveThreadDump.newBuilder();
62+
builder.setLimit(tCmdActiveThreadDump.getLimit());
63+
if (tCmdActiveThreadDump.isSetLocalTraceIdList()) {
64+
builder.addAllLocalTraceId(tCmdActiveThreadDump.getLocalTraceIdList());
65+
}
66+
if (tCmdActiveThreadDump.isSetThreadNameList()) {
67+
builder.addAllThreadName(tCmdActiveThreadDump.getThreadNameList());
68+
}
69+
70+
return builder.build();
71+
}
72+
73+
private PCmdActiveThreadLightDump buildPCmdActiveThreadLightDump(TCmdActiveThreadLightDump tCmdActiveThreadLightDump) {
74+
PCmdActiveThreadLightDump.Builder builder = PCmdActiveThreadLightDump.newBuilder();
75+
builder.setLimit(tCmdActiveThreadLightDump.getLimit());
76+
if (tCmdActiveThreadLightDump.isSetLocalTraceIdList()) {
77+
builder.addAllLocalTraceId(tCmdActiveThreadLightDump.getLocalTraceIdList());
78+
}
79+
if (tCmdActiveThreadLightDump.isSetThreadNameList()) {
80+
builder.addAllThreadName(tCmdActiveThreadLightDump.getThreadNameList());
81+
}
82+
83+
return builder.build();
84+
}
85+
86+
}

collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/PinpointGrpcServer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import com.navercorp.pinpoint.grpc.trace.PCmdEcho;
2929
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
3030
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
31-
import com.navercorp.pinpoint.profiler.context.thrift.CommandGrpcToThriftMessageConverter;
31+
import com.navercorp.pinpoint.profiler.sender.message.CommandGrpcToThriftMessageConverter;
3232
import com.navercorp.pinpoint.rpc.PinpointSocketException;
3333
import com.navercorp.pinpoint.rpc.ResponseMessage;
3434
import com.navercorp.pinpoint.rpc.client.RequestManager;

collector/src/main/java/com/navercorp/pinpoint/collector/sender/FlinkTcpDataSender.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
2020
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
2121
import com.navercorp.pinpoint.thrift.io.TBaseSerializer;
22-
import org.apache.thrift.TBase;
23-
import org.apache.logging.log4j.Logger;
2422
import org.apache.logging.log4j.LogManager;
23+
import org.apache.logging.log4j.Logger;
24+
import org.apache.thrift.TBase;
2525
import org.springframework.util.Assert;
2626

2727
import java.util.HashMap;

collector/src/main/java/com/navercorp/pinpoint/collector/service/SendDataToFlinkService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
package com.navercorp.pinpoint.collector.service;
1717

1818
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
19-
import org.apache.thrift.TBase;
20-
import org.apache.logging.log4j.Logger;
2119
import org.apache.logging.log4j.LogManager;
20+
import org.apache.logging.log4j.Logger;
21+
import org.apache.thrift.TBase;
2222

2323
import java.util.ArrayList;
2424
import java.util.List;

collector/src/test/java/com/navercorp/pinpoint/collector/receiver/DataReceiverGroupTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@
2121
import com.navercorp.pinpoint.collector.receiver.thrift.TCPReceiverBean;
2222
import com.navercorp.pinpoint.collector.receiver.thrift.UDPReceiverBean;
2323
import com.navercorp.pinpoint.collector.thrift.config.DataReceiverGroupProperties;
24+
import com.navercorp.pinpoint.common.profiler.message.BypassMessageConverter;
25+
import com.navercorp.pinpoint.common.profiler.message.DataSender;
26+
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
2427
import com.navercorp.pinpoint.common.server.util.AddressFilter;
2528
import com.navercorp.pinpoint.io.request.ServerRequest;
2629
import com.navercorp.pinpoint.io.request.ServerResponse;
27-
import com.navercorp.pinpoint.profiler.context.thrift.BypassMessageConverter;
28-
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
2930
import com.navercorp.pinpoint.profiler.sender.ByteMessage;
30-
import com.navercorp.pinpoint.profiler.sender.DataSender;
3131
import com.navercorp.pinpoint.profiler.sender.MessageSerializer;
3232
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
3333
import com.navercorp.pinpoint.profiler.sender.ThriftUdpMessageSerializer;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Copyright 2019 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.navercorp.pinpoint.common.profiler.concurrent.executor;
18+
19+
import com.navercorp.pinpoint.common.profiler.concurrent.PinpointThreadFactory;
20+
import org.apache.logging.log4j.LogManager;
21+
import org.apache.logging.log4j.Logger;
22+
23+
import java.util.Collection;
24+
import java.util.Objects;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.ThreadFactory;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
/**
31+
* @author emeroad
32+
*/
33+
public class AsyncQueueingExecutor<T> {
34+
35+
private final Logger logger;
36+
private final boolean isWarn;
37+
38+
private final LinkedBlockingQueue<T> queue;
39+
private final AtomicBoolean isRun = new AtomicBoolean(true);
40+
private final Thread executeThread;
41+
private final String executorName;
42+
43+
private final int maxDrainSize;
44+
// Caution. single thread only. this Collection is simpler than ArrayList.
45+
private final Collection<T> drain;
46+
47+
private final AsyncQueueingExecutorListener<T> listener;
48+
49+
50+
public AsyncQueueingExecutor(int queueSize, String executorName, AsyncQueueingExecutorListener<T> listener) {
51+
Objects.requireNonNull(executorName, "executorName");
52+
53+
this.logger = LogManager.getLogger(this.getClass().getName() + "@" + executorName);
54+
this.isWarn = logger.isWarnEnabled();
55+
56+
// BEFORE executeThread start
57+
this.maxDrainSize = 10;
58+
this.drain = new UnsafeArrayCollection<>(maxDrainSize);
59+
this.queue = new LinkedBlockingQueue<>(queueSize);
60+
61+
this.executeThread = this.createExecuteThread(executorName);
62+
this.executorName = executeThread.getName();
63+
64+
this.listener = Objects.requireNonNull(listener, "consumer");
65+
}
66+
67+
private Thread createExecuteThread(String executorName) {
68+
final ThreadFactory threadFactory = new PinpointThreadFactory(executorName, true);
69+
Thread thread = threadFactory.newThread(this::doAccept);
70+
thread.start();
71+
return thread;
72+
}
73+
74+
private void doAccept() {
75+
long timeout = 2000;
76+
drainStartEntry:
77+
while (isRun()) {
78+
try {
79+
final Collection<T> dtoList = getDrainQueue();
80+
final int drainSize = takeN(dtoList, this.maxDrainSize);
81+
if (drainSize > 0) {
82+
doAccept(dtoList);
83+
continue;
84+
}
85+
86+
while (isRun()) {
87+
final T dto = takeOne(timeout);
88+
if (dto != null) {
89+
doAccept(dto);
90+
continue drainStartEntry;
91+
} else {
92+
pollTimeout(timeout);
93+
}
94+
}
95+
} catch (Throwable th) {
96+
logger.warn("{} doExecute(). Unexpected Error. Cause:{}", executorName, th.getMessage(), th);
97+
}
98+
}
99+
flushQueue();
100+
}
101+
102+
private void flushQueue() {
103+
boolean debugEnabled = logger.isDebugEnabled();
104+
if (debugEnabled) {
105+
logger.debug("Loop is stop.");
106+
}
107+
while(true) {
108+
final Collection<T> elementList = getDrainQueue();
109+
int drainSize = takeN(elementList, this.maxDrainSize);
110+
if (drainSize == 0) {
111+
break;
112+
}
113+
if (debugEnabled) {
114+
logger.debug("flushData size {}", drainSize);
115+
}
116+
doAccept(elementList);
117+
}
118+
}
119+
120+
private T takeOne(long timeout) {
121+
try {
122+
return queue.poll(timeout, TimeUnit.MILLISECONDS);
123+
} catch (InterruptedException e) {
124+
Thread.currentThread().interrupt();
125+
return null;
126+
}
127+
}
128+
129+
private int takeN(Collection<T> drain, int maxDrainSize) {
130+
return queue.drainTo(drain, maxDrainSize);
131+
}
132+
133+
protected void pollTimeout(long timeout) {
134+
// do nothing
135+
}
136+
137+
public boolean execute(T data) {
138+
if (data == null) {
139+
if (isWarn) {
140+
logger.warn("execute(). data is null");
141+
}
142+
return false;
143+
}
144+
if (!isRun.get()) {
145+
if (isWarn) {
146+
logger.warn("{} is shutdown. discard data:{}", executorName, data);
147+
}
148+
return false;
149+
}
150+
boolean offer = queue.offer(data);
151+
if (!offer) {
152+
if (isWarn) {
153+
logger.warn("{} Drop data. queue is full. size:{}", executorName, queue.size());
154+
}
155+
}
156+
return offer;
157+
}
158+
159+
160+
private void doAccept(Collection<T> elements) {
161+
this.listener.execute(elements);
162+
}
163+
164+
private void doAccept(T element) {
165+
this.listener.execute(element);
166+
}
167+
168+
public boolean isEmpty() {
169+
return queue.isEmpty();
170+
}
171+
172+
public boolean isRun() {
173+
return isRun.get();
174+
}
175+
176+
public void stop() {
177+
isRun.set(false);
178+
179+
if (!isEmpty()) {
180+
logger.info("Wait 5 seconds. Flushing queued data.");
181+
}
182+
executeThread.interrupt();
183+
try {
184+
executeThread.join(5000);
185+
} catch (InterruptedException e) {
186+
Thread.currentThread().interrupt();
187+
logger.warn("{} stopped incompletely.", executorName);
188+
}
189+
190+
logger.info("{} stopped.", executorName);
191+
}
192+
193+
Collection<T> getDrainQueue() {
194+
this.drain.clear();
195+
return drain;
196+
}
197+
}

0 commit comments

Comments
 (0)