Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix InMemoryMetricsRepository can't keep the last five minutes data problem and Improve read-write performance #1319

Merged
merged 2 commits into from
Mar 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
*/
package com.alibaba.csp.sentinel.dashboard.repository.metric;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.util.StringUtil;
import com.alibaba.csp.sentinel.util.TimeUtil;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.util.StringUtil;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import org.springframework.stereotype.Component;

/**
* Caches metrics data in a period of time in memory.
*
Expand All @@ -44,54 +44,71 @@ public class InMemoryMetricsRepository implements MetricsRepository<MetricEntity
/**
* {@code app -> resource -> timestamp -> metric}
*/
private Map<String, Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>();
private Map<String, Map<String, LinkedHashMap<Long, MetricEntity>>> allMetrics = new ConcurrentHashMap<>();

private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();


@Override
public synchronized void save(MetricEntity entity) {
public void save(MetricEntity entity) {
if (entity == null || StringUtil.isBlank(entity.getApp())) {
return;
}
allMetrics.computeIfAbsent(entity.getApp(), e -> new ConcurrentHashMap<>(16))
.computeIfAbsent(entity.getResource(), e -> new ConcurrentLinkedHashMap.Builder<Long, MetricEntity>()
.maximumWeightedCapacity(MAX_METRIC_LIVE_TIME_MS).weigher((key, value) -> {
// Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed.
int weight = (int)(System.currentTimeMillis() - key);
// weight must be a number greater than or equal to one
return Math.max(weight, 1);
}).build()).put(entity.getTimestamp().getTime(), entity);
readWriteLock.writeLock().lock();
try {
allMetrics.computeIfAbsent(entity.getApp(), e -> new HashMap<>(16))
.computeIfAbsent(entity.getResource(), e -> new LinkedHashMap<Long, MetricEntity>() {
@Override
protected boolean removeEldestEntry(Entry<Long, MetricEntity> eldest) {
// Metric older than {@link #MAX_METRIC_LIVE_TIME_MS} will be removed.
return eldest.getKey() < TimeUtil.currentTimeMillis() - MAX_METRIC_LIVE_TIME_MS;
}
}).put(entity.getTimestamp().getTime(), entity);
} finally {
readWriteLock.writeLock().unlock();
}

}

@Override
public synchronized void saveAll(Iterable<MetricEntity> metrics) {
public void saveAll(Iterable<MetricEntity> metrics) {
if (metrics == null) {
return;
}
metrics.forEach(this::save);
readWriteLock.writeLock().lock();
try {
metrics.forEach(this::save);
} finally {
readWriteLock.writeLock().unlock();
}
}

@Override
public synchronized List<MetricEntity> queryByAppAndResourceBetween(String app, String resource,
long startTime, long endTime) {
public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource,
long startTime, long endTime) {
List<MetricEntity> results = new ArrayList<>();
if (StringUtil.isBlank(app)) {
return results;
}
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
if (resourceMap == null) {
return results;
}
ConcurrentLinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource);
LinkedHashMap<Long, MetricEntity> metricsMap = resourceMap.get(resource);
if (metricsMap == null) {
return results;
}
for (Entry<Long, MetricEntity> entry : metricsMap.entrySet()) {
if (entry.getKey() >= startTime && entry.getKey() <= endTime) {
results.add(entry.getValue());
readWriteLock.readLock().lock();
try {
for (Entry<Long, MetricEntity> entry : metricsMap.entrySet()) {
if (entry.getKey() >= startTime && entry.getKey() <= endTime) {
results.add(entry.getValue());
}
}
return results;
} finally {
readWriteLock.readLock().unlock();
}
return results;
}

@Override
Expand All @@ -101,44 +118,49 @@ public List<String> listResourcesOfApp(String app) {
return results;
}
// resource -> timestamp -> metric
Map<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
Map<String, LinkedHashMap<Long, MetricEntity>> resourceMap = allMetrics.get(app);
if (resourceMap == null) {
return results;
}
final long minTimeMs = System.currentTimeMillis() - 1000 * 60;
Map<String, MetricEntity> resourceCount = new ConcurrentHashMap<>(32);

for (Entry<String, ConcurrentLinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
if (metrics.getKey() < minTimeMs) {
continue;
}
MetricEntity newEntity = metrics.getValue();
if (resourceCount.containsKey(resourceMetrics.getKey())) {
MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
oldEntity.addPassQps(newEntity.getPassQps());
oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
oldEntity.addBlockQps(newEntity.getBlockQps());
oldEntity.addExceptionQps(newEntity.getExceptionQps());
oldEntity.addCount(1);
} else {
resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
readWriteLock.readLock().lock();
try {
for (Entry<String, LinkedHashMap<Long, MetricEntity>> resourceMetrics : resourceMap.entrySet()) {
for (Entry<Long, MetricEntity> metrics : resourceMetrics.getValue().entrySet()) {
if (metrics.getKey() < minTimeMs) {
continue;
}
MetricEntity newEntity = metrics.getValue();
if (resourceCount.containsKey(resourceMetrics.getKey())) {
MetricEntity oldEntity = resourceCount.get(resourceMetrics.getKey());
oldEntity.addPassQps(newEntity.getPassQps());
oldEntity.addRtAndSuccessQps(newEntity.getRt(), newEntity.getSuccessQps());
oldEntity.addBlockQps(newEntity.getBlockQps());
oldEntity.addExceptionQps(newEntity.getExceptionQps());
oldEntity.addCount(1);
} else {
resourceCount.put(resourceMetrics.getKey(), MetricEntity.copyOf(newEntity));
}
}
}
// Order by last minute b_qps DESC.
return resourceCount.entrySet()
.stream()
.sorted((o1, o2) -> {
MetricEntity e1 = o1.getValue();
MetricEntity e2 = o2.getValue();
int t = e2.getBlockQps().compareTo(e1.getBlockQps());
if (t != 0) {
return t;
}
return e2.getPassQps().compareTo(e1.getPassQps());
})
.map(Entry::getKey)
.collect(Collectors.toList());
} finally {
readWriteLock.readLock().unlock();
}
// Order by last minute b_qps DESC.
return resourceCount.entrySet()
.stream()
.sorted((o1, o2) -> {
MetricEntity e1 = o1.getValue();
MetricEntity e2 = o2.getValue();
int t = e2.getBlockQps().compareTo(e1.getBlockQps());
if (t != 0) {
return t;
}
return e2.getPassQps().compareTo(e1.getPassQps());
})
.map(Entry::getKey)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@
package com.alibaba.csp.sentinel.dashboard.repository.metric;

import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;

import org.assertj.core.util.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.*;

Expand All @@ -37,13 +44,11 @@
*/
public class InMemoryMetricsRepositoryTest {

private static final String DEFAULT_APP = "default";
private static final String DEFAULT_EXPIRE_APP = "default_expire_app";
private static final String DEFAULT_RESOURCE = "test";
private final static String DEFAULT_APP = "defaultApp";
private final static String DEFAULT_RESOURCE = "defaultResource";
private static final long EXPIRE_TIME = 1000 * 60 * 5L;

private InMemoryMetricsRepository inMemoryMetricsRepository;

private ExecutorService executorService;

@Before
Expand All @@ -57,35 +62,58 @@ public void tearDown() {
executorService.shutdownNow();
}

private void testSave() {
for (int i = 0; i < 1000000; i++) {

@Test
public void testSave() {
MetricEntity entry = new MetricEntity();
entry.setApp("testSave");
entry.setResource("testResource");
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
List<String> resources = inMemoryMetricsRepository.listResourcesOfApp("testSave");
Assert.assertTrue(resources.size() == 1 && "testResource".equals(resources.get(0)));
}


@Test
public void testSaveAll() {
List<MetricEntity> entities = new ArrayList<>(10000);
for (int i = 0; i < 10000; i++) {
MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setApp("testSaveAll");
entry.setResource("testResource" + i);
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
entities.add(entry);
}
inMemoryMetricsRepository.saveAll(entities);
List<String> result = inMemoryMetricsRepository.listResourcesOfApp("testSaveAll");
Assert.assertTrue(result.size() == entities.size());
}


@Test
public void testExpireMetric() {
long now = System.currentTimeMillis();
MetricEntity expireEntry = new MetricEntity();
expireEntry.setApp(DEFAULT_EXPIRE_APP);
expireEntry.setApp(DEFAULT_APP);
expireEntry.setResource(DEFAULT_RESOURCE);
expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 10L));
expireEntry.setTimestamp(new Date(now - EXPIRE_TIME - 1L));
expireEntry.setPassQps(1L);
expireEntry.setExceptionQps(1L);
expireEntry.setBlockQps(0L);
expireEntry.setSuccessQps(1L);
inMemoryMetricsRepository.save(expireEntry);

MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_EXPIRE_APP);
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(now));
entry.setPassQps(1L);
Expand All @@ -95,47 +123,40 @@ public void testExpireMetric() {
inMemoryMetricsRepository.save(entry);

List<MetricEntity> list = inMemoryMetricsRepository.queryByAppAndResourceBetween(
DEFAULT_EXPIRE_APP, DEFAULT_RESOURCE, now - 2 * EXPIRE_TIME, now + EXPIRE_TIME);
DEFAULT_APP, DEFAULT_RESOURCE, now - EXPIRE_TIME, now);

assertFalse(CollectionUtils.isEmpty(list));
assertEquals(1, list.size());
assertTrue(list.get(0).getTimestamp().getTime() >= now - EXPIRE_TIME && list.get(0).getTimestamp().getTime() <= now);

}

@Test
public void testListResourcesOfApp() {
// prepare basic test data
testSave();
System.out.println( "[" + System.currentTimeMillis() + "] Basic test data ready in testListResourcesOfApp");

List<CompletableFuture> futures = Lists.newArrayList();
@Test
public void testConcurrentPutAndGet() {

// concurrent query resources of app
List<CompletableFuture> futures = new ArrayList<>(10000);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(8);

for (int j = 0; j < 10000; j++) {
futures.add(
CompletableFuture.runAsync(() -> {
final int finalJ = j;
futures.add(CompletableFuture.runAsync(() -> {
try {
cyclicBarrier.await();
inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP);
if (finalJ % 2 == 0) {
batchSave();
} else {
inMemoryMetricsRepository.listResourcesOfApp(DEFAULT_APP);
}

} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, executorService)

}, executorService)
);
}

// batch add metric entity
for (int i = 0; i < 10000; i++) {
MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(System.currentTimeMillis() - EXPIRE_TIME - 1000L));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
}

CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

Expand All @@ -155,4 +176,19 @@ public void testListResourcesOfApp() {
}
}

private void batchSave() {
for (int i = 0; i < 100; i++) {
MetricEntity entry = new MetricEntity();
entry.setApp(DEFAULT_APP);
entry.setResource(DEFAULT_RESOURCE);
entry.setTimestamp(new Date(System.currentTimeMillis()));
entry.setPassQps(1L);
entry.setExceptionQps(1L);
entry.setBlockQps(0L);
entry.setSuccessQps(1L);
inMemoryMetricsRepository.save(entry);
}
}


}