-
Notifications
You must be signed in to change notification settings - Fork 2
/
cache.go
105 lines (81 loc) · 3.09 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
cache metricCache
)
type metricData struct {
timestamp time.Time
metric metric
}
type metricCache struct {
cache map[string][]metricData // map: deploymentid => metricData
}
func (c *metricCache) initializeIfNil() {
if c.cache == nil {
c.cache = make(map[string][]metricData)
log.Debug("cache initialized")
}
}
func (c *metricCache) getSize(deploymentid string) int {
return len(c.cache[deploymentid])
}
func (c *metricCache) isEmpty(deploymentid string) bool {
return c.getSize(deploymentid) == 0
}
func (c *metricCache) append(deploymentid string, metric metric, scalePeriodSeconds int64) {
c.initializeIfNil()
log.Debugf("[deploymentid: %v] appending metric {name: %v, value: %v}", deploymentid, metric.name, metric.value)
c.cache[deploymentid] = append(c.cache[deploymentid], metricData{
timestamp: time.Now().UTC(),
metric: metric,
})
log.Debugf("[deploymentid: %v] appended metric {name: %v, value: %v}", deploymentid, metric.name, metric.value)
c.purge(deploymentid, scalePeriodSeconds)
}
func (c *metricCache) getPurgeIndex(deploymentid string, scalePeriodSeconds int64) int64 {
var index int64 = 0
now := time.Now().UTC()
for _, d := range c.cache[deploymentid] {
seconds := now.Sub(d.timestamp).Seconds()
if seconds > float64(scalePeriodSeconds) {
index++
}
}
log.Debugf("[deploymentid: %v] number of values to purge: %v", deploymentid, index)
return index
}
func (c *metricCache) purge(deploymentid string, scalePeriodSeconds int64) {
log.Debugf("[deploymentid: %v] purging metric values [%v = %v]", deploymentid, keyScalePeriodSeconds, scalePeriodSeconds)
if c.isEmpty(deploymentid) {
log.Debugf("[deploymentid: %v] cache is already empty, purge not needed", deploymentid)
return
}
// remove values with timestamps with difference older than scalePeriodSeconds
// e.g. if scalePeriodSeconds = 600, all the values with difference >= 600 will be removed
purgeIndex := c.getPurgeIndex(deploymentid, scalePeriodSeconds)
if purgeIndex > 0 {
oldCacheSize := c.getSize(deploymentid)
c.cache[deploymentid] = c.cache[deploymentid][purgeIndex:]
newCacheSize := c.getSize(deploymentid)
noOfValuesPurged := oldCacheSize - newCacheSize
log.Infof("[deploymentid: %v] purged %v value(s). cache size: {old: %v, new: %v}", deploymentid, noOfValuesPurged, oldCacheSize, newCacheSize)
}
// after purging values, if a cache's list for a certain deploymentid is empty,
// it's best to purge its slot completely also instead of retaining its memory,
// for the same deploymentid, the slot will be added again if it reappears later
if c.isEmpty(deploymentid) {
delete(c.cache, deploymentid)
log.Infof("[deploymentid: %v] empty cache slot purged completely", deploymentid)
}
}
func (c *metricCache) getOldestMetricData(deploymentid string) (metricData, error) {
if c.isEmpty(deploymentid) {
return metricData{}, status.Errorf(codes.NotFound, "[deploymentid: %v] cache is empty", deploymentid)
}
return c.cache[deploymentid][0], nil
}