Skip to content

Commit 97fc92a

Browse files
committed
[#9666] Fixed realtime to maintain valid value
1 parent 346eeea commit 97fc92a

File tree

3 files changed

+43
-15
lines changed

3 files changed

+43
-15
lines changed

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountFetcherFactory.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import com.navercorp.pinpoint.realtime.dto.ATCSupply;
2222
import reactor.core.publisher.Flux;
2323

24-
import java.util.function.Supplier;
24+
import java.util.function.Function;
2525

2626
/**
2727
* @author youngjin.kim2
@@ -30,19 +30,32 @@ class ActiveThreadCountFetcherFactory implements FetcherFactory<ClusterKey, ATCS
3030

3131
private final PubSubFluxClient<ATCDemand, ATCSupply> endpoint;
3232
private final long recordMaxAgeNanos;
33+
private final long prepareTimeoutNanos;
3334

34-
ActiveThreadCountFetcherFactory(PubSubFluxClient<ATCDemand, ATCSupply> endpoint, long recordMaxAgeNanos) {
35+
ActiveThreadCountFetcherFactory(
36+
PubSubFluxClient<ATCDemand, ATCSupply> endpoint,
37+
long recordMaxAgeNanos,
38+
long prepareTimeoutNanos
39+
) {
3540
this.endpoint = endpoint;
3641
this.recordMaxAgeNanos = recordMaxAgeNanos;
42+
this.prepareTimeoutNanos = prepareTimeoutNanos;
3743
}
3844

3945
@Override
4046
public Fetcher<ATCSupply> getFetcher(ClusterKey key) {
41-
return new OptimisticFetcher<>(this.makeValueSupplier(key), this.recordMaxAgeNanos);
47+
return new OptimisticFetcher<>(this.makeValueSupplier(key), this.recordMaxAgeNanos, this.prepareTimeoutNanos);
4248
}
4349

44-
private Supplier<Flux<ATCSupply>> makeValueSupplier(ClusterKey key) {
45-
return () -> this.endpoint.request(makeDemand(key));
50+
private Function<Integer, Flux<ATCSupply>> makeValueSupplier(ClusterKey key) {
51+
return i -> {
52+
final Flux<ATCSupply> response = this.endpoint.request(makeDemand(key));
53+
if (i == 0) {
54+
return response;
55+
} else {
56+
return response.filter(el -> !el.getValues().isEmpty());
57+
}
58+
};
4659
}
4760

4861
private ATCDemand makeDemand(ClusterKey key) {

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/ActiveThreadCountWebDaoConfig.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class ActiveThreadCountWebDaoConfig {
4242
@Value("${pinpoint.web.realtime.atc.supply.expireInMs:3000}")
4343
long supplyExpireInMs;
4444

45+
@Value("${pinpoint.web.realtime.atc.supply.prepareInMs:10000}")
46+
long prepareInMs;
47+
4548
@Bean
4649
PubSubFluxClient<ATCDemand, ATCSupply> atcEndpoint(PubSubClientFactory clientFactory) {
4750
return clientFactory.build(RealtimePubSubServiceDescriptors.ATC);
@@ -62,7 +65,9 @@ FetcherFactory<ClusterKey, ATCSupply> atcSupplyFetcherFactory(
6265
Cache<ClusterKey, Fetcher<ATCSupply>> fetcherCache
6366
) {
6467
final long recordMaxAgeNanos = TimeUnit.MILLISECONDS.toNanos(supplyExpireInMs);
65-
final ActiveThreadCountFetcherFactory fetcherFactory = new ActiveThreadCountFetcherFactory(endpoint, recordMaxAgeNanos);
68+
final long prepareInNanos = TimeUnit.MILLISECONDS.toNanos(prepareInMs);
69+
final ActiveThreadCountFetcherFactory fetcherFactory
70+
= new ActiveThreadCountFetcherFactory(endpoint, recordMaxAgeNanos, prepareInNanos);
6671
return new CachedFetcherFactory<>(fetcherFactory, fetcherCache);
6772
}
6873

realtime/realtime-web/src/main/java/com/navercorp/pinpoint/web/realtime/activethread/count/dao/OptimisticFetcher.java

+19-9
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@
2323

2424
import java.util.Objects;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627
import java.util.concurrent.atomic.AtomicLong;
2728
import java.util.concurrent.atomic.AtomicReference;
28-
import java.util.function.Supplier;
29+
import java.util.function.Function;
2930

3031
/**
3132
* @author youngjin.kim2
@@ -34,27 +35,34 @@ public class OptimisticFetcher<T> implements Fetcher<T> {
3435

3536
private static final Logger logger = LogManager.getLogger(OptimisticFetcher.class);
3637

37-
private final Supplier<Flux<T>> valueSupplier;
38+
private final Function<Integer, Flux<T>> valueSupplier;
3839
private final long recordMaxAgeNanos;
40+
private final long prepareTimeoutNanos;
3941

4042
private final AtomicReference<Record<T>> recordRef = new AtomicReference<>();
4143
private final Throttle prepareThrottle = new MinTermThrottle(TimeUnit.SECONDS.toNanos(3));
44+
private final AtomicInteger numPrepared = new AtomicInteger(0);
4245
private final AtomicLong latestPrepareTime = new AtomicLong(0);
4346

44-
public OptimisticFetcher(Supplier<Flux<T>> valueSupplier, long recordMaxAgeNanos) {
47+
public OptimisticFetcher(
48+
Function<Integer, Flux<T>> valueSupplier,
49+
long recordMaxAgeNanos,
50+
long prepareTimeoutNanos
51+
) {
4552
this.valueSupplier = Objects.requireNonNull(valueSupplier, "valueSupplier");
4653
this.recordMaxAgeNanos = recordMaxAgeNanos;
54+
this.prepareTimeoutNanos = prepareTimeoutNanos;
4755
}
4856

4957
@Override
5058
public T fetch() {
5159
final Record<T> latestRecord = this.recordRef.get();
52-
if (latestRecord == null || latestRecord.isOld(System.nanoTime() - this.recordMaxAgeNanos)) {
60+
if (latestRecord == null || latestRecord.olderThan(System.nanoTime() - this.recordMaxAgeNanos)) {
5361
prepareForNext();
5462
return null;
5563
}
5664

57-
if (this.latestPrepareTime.get() < System.nanoTime() - TimeUnit.SECONDS.toNanos(12)) {
65+
if (this.latestPrepareTime.get() < System.nanoTime() - this.prepareTimeoutNanos) {
5866
prepareForNext();
5967
}
6068

@@ -64,19 +72,21 @@ public T fetch() {
6472
private void prepareForNext() {
6573
if (this.prepareThrottle.hit()) {
6674
logger.debug("Fetcher Started");
67-
this.valueSupplier.get()
75+
final long prepareTime = System.nanoTime();
76+
this.valueSupplier.apply(this.numPrepared.getAndIncrement())
6877
.doOnNext(item -> logger.trace("Fetcher Received: {}", item))
6978
.doOnComplete(() -> logger.debug("Fetcher Completed"))
7079
.subscribe(this::put);
71-
this.latestPrepareTime.set(System.nanoTime());
80+
this.latestPrepareTime.set(prepareTime);
7281
}
7382
}
7483

7584
private void put(T supply) {
7685
if (supply == null) {
7786
return;
7887
}
79-
this.recordRef.set(new Record<>(supply));
88+
final Record<T> nextRecord = new Record<>(supply);
89+
this.recordRef.set(nextRecord);
8090
}
8191

8292
private static final class Record<T> {
@@ -93,7 +103,7 @@ T getValue() {
93103
return this.value;
94104
}
95105

96-
boolean isOld(long thresholdNanos) {
106+
boolean olderThan(long thresholdNanos) {
97107
return this.createdAt < thresholdNanos;
98108
}
99109

0 commit comments

Comments
 (0)