Skip to content

Commit ccaafd9

Browse files
committed
[pinpoint-apm#9666] temporary
1 parent 77ae35e commit ccaafd9

File tree

15 files changed

+406
-191
lines changed

15 files changed

+406
-191
lines changed

realtime/realtime-common/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@
5151
<artifactId>gson</artifactId>
5252
<version>2.10.1</version>
5353
</dependency>
54+
<dependency>
55+
<groupId>com.google.guava</groupId>
56+
<artifactId>guava</artifactId>
57+
</dependency>
5458
<dependency>
5559
<groupId>com.fasterxml.jackson.core</groupId>
5660
<artifactId>jackson-databind</artifactId>

realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/RealtimeCommonConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ PubSubEndpointDescription<ATCDemand, ATCSupply> atcDescription() {
4646
public String getSupplyKey(ATCDemand demand) {
4747
return demand.getApplicationName() + ':' + demand.getAgentId() + ':' + demand.getDemandId();
4848
}
49+
50+
@Override
51+
public boolean isLongTerm() {
52+
return true;
53+
}
4954
};
5055
}
5156

realtime/realtime-common/src/main/java/com/navercorp/pinpoint/realtime/dto/ATCSupply.java

+10
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,14 @@ public int hashCode() {
8383
return Objects.hash(agentId, startTimestamp, collectorId, values);
8484
}
8585

86+
@Override
87+
public String toString() {
88+
return "ATCSupply{" +
89+
"applicationName='" + applicationName + '\'' +
90+
", agentId='" + agentId + '\'' +
91+
", startTimestamp=" + startTimestamp +
92+
", collectorId='" + collectorId + '\'' +
93+
", values=" + values +
94+
'}';
95+
}
8696
}

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/RealtimeWebConfig.java

+13
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
*/
1616
package com.navercorp.pinpoint.web.realtime;
1717

18+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
1819
import com.navercorp.pinpoint.realtime.RealtimeCommonConfig;
1920
import com.navercorp.pinpoint.web.realtime.atc.RealtimeWebATCConfig;
2021
import com.navercorp.pinpoint.web.realtime.atd.RealtimeWebATDConfig;
2122
import com.navercorp.pinpoint.web.realtime.service.RealtimeWebServiceConfig;
2223
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
24+
import org.springframework.context.annotation.Bean;
2325
import org.springframework.context.annotation.Configuration;
2426
import org.springframework.context.annotation.Import;
2527

28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ThreadFactory;
31+
2632
/**
2733
* @author youngjin.kim2
2834
*/
@@ -35,4 +41,11 @@
3541
RealtimeCommonConfig.class
3642
})
3743
public class RealtimeWebConfig {
44+
45+
@Bean("redisMessageListenerTaskExecutor")
46+
ExecutorService redisMessageListenerTaskExecutor() {
47+
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("redisMessageExecutor-%d").build();
48+
return Executors.newFixedThreadPool(8, threadFactory);
49+
}
50+
3851
}

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/atc/dao/ATCDaoImpl.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,17 @@
1515
*/
1616
package com.navercorp.pinpoint.web.realtime.atc.dao;
1717

18+
import com.google.common.cache.Cache;
19+
import com.google.common.cache.CacheBuilder;
1820
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
1921
import com.navercorp.pinpoint.pubsub.IdentifierProvider;
2022
import com.navercorp.pinpoint.pubsub.endpoint.PubSubEndpoint;
2123
import com.navercorp.pinpoint.realtime.dto.ATCDemand;
2224
import com.navercorp.pinpoint.realtime.dto.ATCSupply;
2325

2426
import java.util.List;
25-
import java.util.Map;
2627
import java.util.Objects;
27-
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.TimeUnit;
2829

2930
/**
3031
* @author youngjin.kim2
@@ -35,7 +36,11 @@ public class ATCDaoImpl implements ATCDao {
3536
private final PubSubEndpoint<ATCDemand, ATCSupply> endpoint;
3637
private final long recordMaxAgeNanos;
3738

38-
private final Map<ClusterKey, ATCFetcher> fetcherMap = new ConcurrentHashMap<>();
39+
private final Cache<ClusterKey, ATCFetcher> fetcherMap = CacheBuilder.newBuilder()
40+
.expireAfterAccess(1, TimeUnit.HOURS)
41+
.initialCapacity(512)
42+
.maximumSize(65536)
43+
.build();
3944

4045
public ATCDaoImpl(
4146
IdentifierProvider idProvider,
@@ -53,10 +58,13 @@ public List<Integer> fetch(ClusterKey clusterKey) {
5358
}
5459

5560
private ATCFetcher getFetcher(ClusterKey clusterKey) {
56-
return fetcherMap.computeIfAbsent(
57-
clusterKey,
58-
key -> new ATCFetcherImpl(this.idProvider, this.endpoint, key, recordMaxAgeNanos)
59-
);
61+
final ATCFetcher fetcher = fetcherMap.getIfPresent(clusterKey);
62+
if (fetcher != null) {
63+
return fetcher;
64+
}
65+
final ATCFetcher newFetcher = new ATCFetcherImpl(idProvider, endpoint, clusterKey, recordMaxAgeNanos);
66+
fetcherMap.put(clusterKey, newFetcher);
67+
return newFetcher;
6068
}
6169

6270
}

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/atc/dao/ATCFetcherImpl.java

+20-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.navercorp.pinpoint.realtime.dto.ATCSupply;
2323
import com.navercorp.pinpoint.realtime.util.MinTermThrottle;
2424
import com.navercorp.pinpoint.realtime.util.Throttle;
25+
import org.apache.logging.log4j.LogManager;
26+
import org.apache.logging.log4j.Logger;
2527

2628
import java.util.List;
2729
import java.util.Objects;
@@ -34,6 +36,8 @@
3436
*/
3537
public class ATCFetcherImpl implements ATCFetcher {
3638

39+
private static final Logger logger = LogManager.getLogger(ATCFetcherImpl.class);
40+
3741
private final IdentifierProvider idProvider;
3842
private final PubSubEndpoint<ATCDemand, ATCSupply> endpoint;
3943
private final ClusterKey clusterKey;
@@ -63,7 +67,7 @@ public List<Integer> fetch() {
6367
return null;
6468
}
6569

66-
if (this.latestPrepareTime.get() < System.nanoTime() - TimeUnit.SECONDS.toNanos(7)) {
70+
if (this.latestPrepareTime.get() < System.nanoTime() - TimeUnit.SECONDS.toNanos(12)) {
6771
prepareForNext();
6872
}
6973

@@ -72,7 +76,12 @@ public List<Integer> fetch() {
7276

7377
private void prepareForNext() {
7478
if (this.prepareThrottle.hit()) {
75-
this.endpoint.requestStream(getDemand()).subscribe(this::put);
79+
final ATCDemand demand = getDemand();
80+
logger.debug("ATC Fetch Stream({}) Started", demand.getDemandId());
81+
this.endpoint.requestStream(demand)
82+
.doOnNext(supply -> logger.trace("ATC Fetch Stream({}) Received: {}", demand.getDemandId(), supply))
83+
.doOnComplete(() -> logger.debug("ATC Fetch Stream({}) Completed", demand.getDemandId()))
84+
.subscribe(this::put);
7685
this.latestPrepareTime.set(System.nanoTime());
7786
}
7887
}
@@ -82,7 +91,11 @@ private void put(ATCSupply supply) {
8291
}
8392

8493
private void put(List<Integer> values) {
85-
this.recordRef.set(new Record(values));
94+
if (!values.isEmpty()) {
95+
this.recordRef.set(new Record(values));
96+
} else {
97+
this.recordRef.compareAndSet(null, Record.EMPTY);
98+
}
8699
}
87100

88101
private ATCDemand getDemand() {
@@ -95,6 +108,9 @@ private ATCDemand getDemand() {
95108
}
96109

97110
private static final class Record {
111+
112+
private static final Record EMPTY = new Record(List.of());
113+
98114
private final List<Integer> values;
99115
private final long createdAt;
100116

@@ -110,6 +126,7 @@ public List<Integer> getValues() {
110126
public boolean isOld(long thresholdNanos) {
111127
return createdAt < thresholdNanos;
112128
}
129+
113130
}
114131

115132
}

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/atc/dto/ActiveThreadCountResponse.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package com.navercorp.pinpoint.web.realtime.atc.dto;
1717

18+
import com.navercorp.pinpoint.common.server.cluster.ClusterKey;
19+
1820
import java.util.HashMap;
1921
import java.util.List;
2022
import java.util.Map;
@@ -48,7 +50,27 @@ public Result getResult() {
4850
return result;
4951
}
5052

51-
public void putActiveThreadCount(String agentId, int code, String message, List<Integer> status) {
53+
public void putSuccessAgent(ClusterKey agentKey, List<Integer> values) {
54+
putAgent(agentKey.getAgentId(), 0, "OK", values);
55+
}
56+
57+
public void putFailureAgent(ClusterKey agentKey, boolean hasResponse, long connectUntil) {
58+
final String message = decideMessage(hasResponse, connectUntil);
59+
putAgent(agentKey.getAgentId(), -1, message, List.of());
60+
}
61+
62+
private String decideMessage(boolean hasResponse, long connectUntil) {
63+
if (hasResponse) {
64+
return "CONNECTED";
65+
}
66+
if (System.currentTimeMillis() <= connectUntil) {
67+
return "CONNECTING";
68+
} else {
69+
return "NO SIGNAL";
70+
}
71+
}
72+
73+
private void putAgent(String agentId, int code, String message, List<Integer> status) {
5274
final Result.Count count = new Result.Count(code, message, status);
5375
result.activeThreadCounts.put(agentId, count);
5476
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2023 NAVER Corp.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.navercorp.pinpoint.web.realtime.atc.websocket;
17+
18+
import com.navercorp.pinpoint.web.task.TimerTaskDecorator;
19+
import com.navercorp.pinpoint.web.task.TimerTaskDecoratorFactory;
20+
import org.springframework.web.socket.WebSocketSession;
21+
import reactor.core.Disposable;
22+
23+
import java.util.concurrent.locks.Lock;
24+
import java.util.concurrent.locks.ReentrantLock;
25+
import java.util.function.Consumer;
26+
27+
/**
28+
* @author youngjin.kim2
29+
*/
30+
final class ATCSessionContext {
31+
32+
private static final String KEY_ATTR = "atcAttrs";
33+
34+
private final TimerTaskDecorator taskDecorator;
35+
36+
private final Lock lock = new ReentrantLock();
37+
private final long sessionCreatedAt = System.currentTimeMillis();
38+
private Disposable subscription = null;
39+
40+
private ATCSessionContext(TimerTaskDecorator taskDecorator) {
41+
this.taskDecorator = taskDecorator;
42+
}
43+
44+
static void newContext(WebSocketSession session, TimerTaskDecoratorFactory taskDecoratorFactory) {
45+
final ATCSessionContext ctx = new ATCSessionContext(getTimerTaskDecorator(taskDecoratorFactory));
46+
session.getAttributes().put(KEY_ATTR, ctx);
47+
}
48+
49+
private static TimerTaskDecorator getTimerTaskDecorator(TimerTaskDecoratorFactory taskDecoratorFactory) {
50+
if (taskDecoratorFactory == null) {
51+
return null;
52+
}
53+
return taskDecoratorFactory.createTimerTaskDecorator();
54+
}
55+
56+
static void runWithLockedContext(WebSocketSession session, Consumer<ATCSessionContext> consumer) {
57+
final ATCSessionContext ctx = get(session);
58+
if (ctx != null) {
59+
final Lock lock = ctx.lock;
60+
lock.lock();
61+
consumer.accept(ctx);
62+
lock.unlock();
63+
}
64+
}
65+
66+
static void dispose(WebSocketSession session) {
67+
runWithLockedContext(session, ctx -> {
68+
final Disposable subscription = ctx.getSubscription();
69+
if (subscription != null) {
70+
subscription.dispose();
71+
}
72+
});
73+
}
74+
75+
long getSessionCreatedAt() {
76+
return sessionCreatedAt;
77+
}
78+
79+
TimerTaskDecorator getTaskDecorator() {
80+
return taskDecorator;
81+
}
82+
83+
private Disposable getSubscription() {
84+
return subscription;
85+
}
86+
87+
void setSubscription(Disposable newSubscription) {
88+
final Disposable prevSubscription = this.subscription;
89+
this.subscription = newSubscription;
90+
if (prevSubscription != null) {
91+
prevSubscription.dispose();
92+
}
93+
}
94+
95+
static ATCSessionContext get(WebSocketSession session) {
96+
return (ATCSessionContext) session.getAttributes().get(KEY_ATTR);
97+
}
98+
99+
}

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/atc/websocket/ATCWebSocketConfig.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import com.navercorp.pinpoint.web.realtime.atc.dao.ATCDao;
1919
import com.navercorp.pinpoint.web.realtime.atc.dao.ATCWebDaoConfig;
2020
import com.navercorp.pinpoint.web.realtime.service.AgentLookupService;
21+
import com.navercorp.pinpoint.web.task.TimerTaskDecoratorFactory;
2122
import com.navercorp.pinpoint.web.websocket.PinpointWebSocketHandler;
23+
import org.springframework.beans.factory.annotation.Autowired;
2224
import org.springframework.context.annotation.Bean;
2325
import org.springframework.context.annotation.Configuration;
2426
import org.springframework.context.annotation.Import;
2527
import reactor.core.publisher.Flux;
28+
import reactor.core.scheduler.Schedulers;
2629

2730
import java.time.Duration;
2831

@@ -36,9 +39,15 @@ public class ATCWebSocketConfig {
3639
@Bean
3740
PinpointWebSocketHandler redisActiveThreadCountHandler(
3841
ATCDao dao,
39-
AgentLookupService agentLookupService
42+
AgentLookupService agentLookupService,
43+
@Autowired(required = false) TimerTaskDecoratorFactory timerTaskDecoratorFactory
4044
) {
41-
return new RedisActiveThreadCountHandler(dao, agentLookupService, Flux.interval(Duration.ofSeconds(1)));
45+
return new ActiveThreadCountHandler2(
46+
dao,
47+
agentLookupService,
48+
timerTaskDecoratorFactory,
49+
Flux.interval(Duration.ofMillis(1000), Schedulers.newParallel("atcFlusher", 8))
50+
);
4251
}
4352

4453
}

0 commit comments

Comments
 (0)