Skip to content

Commit 60ceb98

Browse files
committed
[pinpoint-apm#9932] Removed thrift dependency of Agent module`
1 parent aab4723 commit 60ceb98

File tree

8 files changed

+62
-71
lines changed

8 files changed

+62
-71
lines changed

commons-profiler/src/main/java/com/navercorp/pinpoint/common/profiler/concurrent/executor/AsyncQueueingExecutor.java

+22-15
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ThreadFactory;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.function.Consumer;
2930

3031
/**
3132
* @author emeroad
@@ -43,10 +44,13 @@ public class AsyncQueueingExecutor<T> {
4344
// Caution. single thread only. this Collection is simpler than ArrayList.
4445
private final Collection<T> drain;
4546

46-
private final AsyncQueueingExecutorListener<T> listener;
47+
private final MultiConsumer<T> consumer;
4748

49+
public AsyncQueueingExecutor(int queueSize, String executorName, Consumer<T> consumer) {
50+
this(queueSize, executorName, new SingleConsumer<>(consumer));
51+
}
4852

49-
public AsyncQueueingExecutor(int queueSize, String executorName, AsyncQueueingExecutorListener<T> listener) {
53+
public AsyncQueueingExecutor(int queueSize, String executorName, MultiConsumer<T> consumer) {
5054
Objects.requireNonNull(executorName, "executorName");
5155

5256
this.logger = LogManager.getLogger(this.getClass().getName() + "@" + executorName);
@@ -59,32 +63,32 @@ public AsyncQueueingExecutor(int queueSize, String executorName, AsyncQueueingEx
5963
this.executeThread = this.createExecuteThread(executorName);
6064
this.executorName = executeThread.getName();
6165

62-
this.listener = Objects.requireNonNull(listener, "listener");
66+
this.consumer = Objects.requireNonNull(consumer, "consumer");
6367
}
6468

6569
private Thread createExecuteThread(String executorName) {
6670
final ThreadFactory threadFactory = new PinpointThreadFactory(executorName, true);
67-
Thread thread = threadFactory.newThread(this::doExecute);
71+
Thread thread = threadFactory.newThread(this::doAccept);
6872
thread.start();
6973
return thread;
7074
}
7175

72-
private void doExecute() {
76+
private void doAccept() {
7377
long timeout = 2000;
7478
drainStartEntry:
7579
while (isRun()) {
7680
try {
7781
final Collection<T> dtoList = getDrainQueue();
7882
final int drainSize = takeN(dtoList, this.maxDrainSize);
7983
if (drainSize > 0) {
80-
doExecute(dtoList);
84+
doAccept(dtoList);
8185
continue;
8286
}
8387

8488
while (isRun()) {
8589
final T dto = takeOne(timeout);
8690
if (dto != null) {
87-
doExecute(dto);
91+
doAccept(dto);
8892
continue drainStartEntry;
8993
} else {
9094
pollTimeout(timeout);
@@ -102,16 +106,16 @@ private void flushQueue() {
102106
if (debugEnabled) {
103107
logger.debug("Loop is stop.");
104108
}
105-
while(true) {
109+
while (true) {
106110
final Collection<T> elementList = getDrainQueue();
107-
int drainSize = takeN(elementList, this.maxDrainSize);
111+
int drainSize = takeN(elementList, this.maxDrainSize);
108112
if (drainSize == 0) {
109113
break;
110114
}
111115
if (debugEnabled) {
112116
logger.debug("flushData size {}", drainSize);
113117
}
114-
doExecute(elementList);
118+
doAccept(elementList);
115119
}
116120
}
117121

@@ -149,12 +153,16 @@ public boolean execute(T data) {
149153
}
150154

151155

152-
private void doExecute(Collection<T> elements) {
153-
this.listener.execute(elements);
156+
private void doAccept(Collection<T> elements) {
157+
try {
158+
this.consumer.acceptN(elements);
159+
} finally {
160+
elements.clear();
161+
}
154162
}
155163

156-
private void doExecute(T element) {
157-
this.listener.execute(element);
164+
private void doAccept(T element) {
165+
this.consumer.accept(element);
158166
}
159167

160168
public boolean isEmpty() {
@@ -183,7 +191,6 @@ public void stop() {
183191
}
184192

185193
Collection<T> getDrainQueue() {
186-
this.drain.clear();
187194
return drain;
188195
}
189196
}
Original file line numberDiff line numberDiff line change
@@ -24,23 +24,17 @@
2424
/**
2525
* @author emeroad
2626
*/
27-
public class EmptyAsyncQueueingExecutorListener<T> implements AsyncQueueingExecutorListener<T> {
27+
public class EmptyConsumer<T> implements MultiConsumer<T> {
2828

2929
private final Logger logger = LogManager.getLogger(this.getClass());
3030

31-
private final boolean isDebug = logger.isDebugEnabled();
32-
3331
@Override
34-
public void execute(Collection<T> dtoList) {
35-
if (isDebug) {
36-
logger.debug("execute(N)");
37-
}
32+
public void acceptN(Collection<T> dtoList) {
33+
logger.debug("execute(N)");
3834
}
3935

4036
@Override
41-
public void execute(T dto) {
42-
if (isDebug) {
43-
logger.debug("execute()");
44-
}
37+
public void accept(T dto) {
38+
logger.debug("execute()");
4539
}
4640
}
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
/**
2222
* @author emeroad
2323
*/
24-
public interface AsyncQueueingExecutorListener<T> {
24+
public interface MultiConsumer<T> {
2525

26-
void execute(Collection<T> messageList);
26+
void acceptN(Collection<T> messageList);
2727

28-
void execute(T message);
28+
void accept(T message);
2929

3030
}
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,24 @@
2020
import org.apache.logging.log4j.Logger;
2121

2222
import java.util.Collection;
23+
import java.util.Objects;
24+
import java.util.function.Consumer;
2325

2426
/**
2527
* @author Woonduk Kang(emeroad)
2628
*/
27-
public abstract class DefaultAsyncQueueingExecutorListener<T> implements AsyncQueueingExecutorListener<T> {
29+
public class SingleConsumer<T> implements MultiConsumer<T> {
2830

2931
private final Logger logger = LogManager.getLogger(this.getClass());
3032

33+
private final Consumer<T> consumer;
34+
35+
public SingleConsumer(Consumer<T> consumer) {
36+
this.consumer = Objects.requireNonNull(consumer, "consumer");
37+
}
38+
3139
@Override
32-
public void execute(Collection<T> messageList) {
40+
public void acceptN(Collection<T> messageList) {
3341
// Cannot use toArray(T[] array) because passed messageList doesn't implement it properly.
3442
Object[] dataList = messageList.toArray();
3543

@@ -39,14 +47,15 @@ public void execute(Collection<T> messageList) {
3947
final int size = messageList.size();
4048
for (int i = 0; i < size; i++) {
4149
try {
42-
execute((T) dataList[i]);
50+
this.accept((T) dataList[i]);
4351
} catch (Throwable th) {
4452
logger.warn("Unexpected Error. Cause:{}", th.getMessage(), th);
4553
}
4654
}
4755
}
4856

49-
50-
public abstract void execute(T message);
57+
public void accept(T message) {
58+
this.consumer.accept(message);
59+
}
5160

5261
}

profiler-thrift/src/main/java/com/navercorp/pinpoint/profiler/thrift/sender/TcpDataSender.java

+2-13
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
package com.navercorp.pinpoint.profiler.thrift.sender;
1818

1919

20-
2120
import com.navercorp.pinpoint.common.profiler.concurrent.executor.AsyncQueueingExecutor;
22-
import com.navercorp.pinpoint.common.profiler.concurrent.executor.AsyncQueueingExecutorListener;
23-
import com.navercorp.pinpoint.common.profiler.concurrent.executor.DefaultAsyncQueueingExecutorListener;
2421
import com.navercorp.pinpoint.common.profiler.message.BypassMessageConverter;
2522
import com.navercorp.pinpoint.common.profiler.message.EnhancedDataSender;
2623
import com.navercorp.pinpoint.common.profiler.message.MessageConverter;
@@ -51,7 +48,6 @@
5148
import java.util.concurrent.TimeUnit;
5249
import java.util.concurrent.atomic.AtomicBoolean;
5350
import java.util.function.BiConsumer;
54-
import java.util.function.Consumer;
5551

5652
/**
5753
* @author emeroad
@@ -121,14 +117,7 @@ private TcpDataSender(String name, ClientFactoryUtils.PinpointClientProvider cli
121117
}
122118

123119
private AsyncQueueingExecutor<Object> createAsyncQueueingExecutor(int queueSize, String executorName) {
124-
AsyncQueueingExecutorListener<Object> listener = new DefaultAsyncQueueingExecutorListener<Object>() {
125-
@Override
126-
public void execute(Object message) {
127-
TcpDataSender.this.sendPacket(message);
128-
}
129-
};
130-
final AsyncQueueingExecutor<Object> executor = new AsyncQueueingExecutor<>(queueSize, executorName, listener);
131-
return executor;
120+
return new AsyncQueueingExecutor<>(queueSize, executorName, this::sendPacket);
132121
}
133122

134123
private Logger newLogger(String name) {
@@ -190,7 +179,7 @@ public boolean isConnected() {
190179
}
191180

192181
@Override
193-
public boolean addEventListener(Consumer<PinpointClient> eventListener) {
182+
public boolean addEventListener(java.util.function.Consumer<PinpointClient> eventListener) {
194183
return this.client.addPinpointClientReconnectEventListener(eventListener);
195184
}
196185

profiler-thrift/src/main/java/com/navercorp/pinpoint/profiler/thrift/sender/UdpDataSender.java

+1-9
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import com.navercorp.pinpoint.common.plugin.util.HostAndPort;
2020
import com.navercorp.pinpoint.common.profiler.concurrent.executor.AsyncQueueingExecutor;
21-
import com.navercorp.pinpoint.common.profiler.concurrent.executor.AsyncQueueingExecutorListener;
22-
import com.navercorp.pinpoint.common.profiler.concurrent.executor.DefaultAsyncQueueingExecutorListener;
2321
import com.navercorp.pinpoint.common.profiler.message.DataSender;
2422
import com.navercorp.pinpoint.common.util.Assert;
2523
import com.navercorp.pinpoint.rpc.client.DnsSocketAddressProvider;
@@ -88,13 +86,7 @@ public boolean send(T data) {
8886
}
8987

9088
private AsyncQueueingExecutor<T> createAsyncQueueingExecutor(int queueSize, String executorName) {
91-
AsyncQueueingExecutorListener<T> consumer = new DefaultAsyncQueueingExecutorListener<T>() {
92-
@Override
93-
public void execute(T message) {
94-
UdpDataSender.this.sendPacket(message);
95-
}
96-
};
97-
return new AsyncQueueingExecutor<>(queueSize, executorName, consumer);
89+
return new AsyncQueueingExecutor<>(queueSize, executorName, this::sendPacket);
9890
}
9991

10092
@Override

profiler/src/main/java/com/navercorp/pinpoint/profiler/context/storage/AsyncQueueingUriStatStorage.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.navercorp.pinpoint.common.profiler.clock.Clock;
2121
import com.navercorp.pinpoint.common.profiler.clock.TickClock;
2222
import com.navercorp.pinpoint.common.profiler.concurrent.executor.AsyncQueueingExecutor;
23-
import com.navercorp.pinpoint.common.profiler.concurrent.executor.AsyncQueueingExecutorListener;
23+
import com.navercorp.pinpoint.common.profiler.concurrent.executor.MultiConsumer;
2424
import com.navercorp.pinpoint.common.profiler.logging.ThrottledLogger;
2525
import com.navercorp.pinpoint.common.util.Assert;
2626
import com.navercorp.pinpoint.common.util.CollectionUtils;
@@ -40,19 +40,19 @@ public class AsyncQueueingUriStatStorage extends AsyncQueueingExecutor<UriStatIn
4040

4141
private static final Logger LOGGER = LogManager.getLogger(AsyncQueueingUriStatStorage.class);
4242
private static final ThrottledLogger TLogger = ThrottledLogger.getLogger(LOGGER, 100);
43-
private final ExecutorListener executorListener;
43+
private final UriStatConsumer consumer;
4444

4545
public AsyncQueueingUriStatStorage(int queueSize, int uriStatDataLimitSize, String executorName) {
46-
this(queueSize, executorName, new ExecutorListener (uriStatDataLimitSize));
46+
this(queueSize, executorName, new UriStatConsumer(uriStatDataLimitSize));
4747
}
4848

4949
public AsyncQueueingUriStatStorage(int queueSize, int uriStatDataLimitSize, String executorName, int collectInterval) {
50-
this(queueSize, executorName, new ExecutorListener (uriStatDataLimitSize, collectInterval));
50+
this(queueSize, executorName, new UriStatConsumer(uriStatDataLimitSize, collectInterval));
5151
}
5252

53-
private AsyncQueueingUriStatStorage(int queueSize, String executorName, ExecutorListener executorListener) {
54-
super(queueSize, executorName, executorListener);
55-
this.executorListener = executorListener;
53+
private AsyncQueueingUriStatStorage(int queueSize, String executorName, UriStatConsumer consumer) {
54+
super(queueSize, executorName, consumer);
55+
this.consumer = consumer;
5656
}
5757

5858
@Override
@@ -69,7 +69,7 @@ public void store(String uri, boolean status, long startTime, long endTime) {
6969

7070
@Override
7171
public AgentUriStatData poll() {
72-
return executorListener.pollCompletedData();
72+
return consumer.pollCompletedData();
7373
}
7474

7575
@Override
@@ -79,10 +79,10 @@ public void close() {
7979

8080
@Override
8181
protected void pollTimeout(long timeout) {
82-
executorListener.executePollTimeout();
82+
consumer.executePollTimeout();
8383
}
8484

85-
static class ExecutorListener implements AsyncQueueingExecutorListener<UriStatInfo> {
85+
static class UriStatConsumer implements MultiConsumer<UriStatInfo> {
8686

8787
private static final int DEFAULT_COLLECT_INTERVAL = 30000; // 30s
8888

@@ -94,11 +94,11 @@ static class ExecutorListener implements AsyncQueueingExecutorListener<UriStatI
9494
private final Snapshot<AgentUriStatData> snapshotManager;
9595

9696

97-
public ExecutorListener(int uriStatDataLimitSize) {
97+
public UriStatConsumer(int uriStatDataLimitSize) {
9898
this(uriStatDataLimitSize, DEFAULT_COLLECT_INTERVAL);
9999
}
100100

101-
public ExecutorListener(int uriStatDataLimitSize, int collectInterval) {
101+
public UriStatConsumer(int uriStatDataLimitSize, int collectInterval) {
102102
Assert.isTrue(uriStatDataLimitSize > 0, "uriStatDataLimitSize must be ' > 0'");
103103

104104
Assert.isTrue(collectInterval > 0, "collectInterval must be ' > 0'");
@@ -109,7 +109,7 @@ public ExecutorListener(int uriStatDataLimitSize, int collectInterval) {
109109
}
110110

111111
@Override
112-
public void execute(Collection<UriStatInfo> messageList) {
112+
public void acceptN(Collection<UriStatInfo> messageList) {
113113
final long currentBaseTimestamp = clock.millis();
114114
checkAndFlushOldData(currentBaseTimestamp);
115115

@@ -122,7 +122,7 @@ public void execute(Collection<UriStatInfo> messageList) {
122122
}
123123

124124
@Override
125-
public void execute(UriStatInfo message) {
125+
public void accept(UriStatInfo message) {
126126
long currentBaseTimestamp = clock.millis();
127127
checkAndFlushOldData(currentBaseTimestamp);
128128

profiler/src/test/java/com/navercorp/pinpoint/profiler/context/MockApplicationContextFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public DefaultApplicationContext build(ProfilerConfig config, ModuleFactory modu
4141
String mockAgentName = "mockAgentName";
4242
String mockApplicationName = "mockApplicationName";
4343
AgentOption agentOption = new DefaultAgentOption(instrumentation, mockAgentId, mockAgentName, mockApplicationName, false,
44-
config, Collections.<String>emptyList(), Collections.<String>emptyList());
44+
config, Collections.emptyList(), Collections.emptyList());
4545
return new DefaultApplicationContext(agentOption, moduleFactory);
4646
}
4747

0 commit comments

Comments
 (0)