Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1536 from grafana/sync-fakemetrics
Browse files Browse the repository at this point in the history
bring in sync with upstream fakemetrics
  • Loading branch information
Dieterbe authored Nov 27, 2019
2 parents 10991a1 + bad96f2 commit c94ca78
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 15 deletions.
4 changes: 2 additions & 2 deletions stacktest/fakemetrics/fakemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ func NewFakeMetrics(metrics []*schema.MetricData, o out.Out, stats met.Backend)
return fm
}

func NewKafka(num int) *FakeMetrics {
func NewKafka(num int, timeout time.Duration, v2 bool) *FakeMetrics {
stats, _ := helper.New(false, "", "standard", "", "")
out, err := kafkamdm.New("mdm", []string{"localhost:9092"}, "none", stats, "lastNum")
out, err := kafkamdm.New("mdm", []string{"localhost:9092"}, "none", timeout, stats, "lastNum", v2)
if err != nil {
log.Fatalf("failed to create kafka-mdm output. %s", err.Error())
}
Expand Down
58 changes: 46 additions & 12 deletions stacktest/fakemetrics/out/kafkamdm/kafkamdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"github.com/Shopify/sarama"
p "github.com/grafana/metrictank/cluster/partitioner"
"github.com/grafana/metrictank/schema"
"github.com/grafana/metrictank/schema/msg"
"github.com/grafana/metrictank/stacktest/fakemetrics/out"
"github.com/grafana/metrictank/stacktest/fakemetrics/out/kafkamdm/keycache"
"github.com/raintank/met"
log "github.com/sirupsen/logrus"
)
Expand All @@ -22,6 +24,8 @@ type KafkaMdm struct {
client sarama.SyncProducer
part p.Partitioner
numPartitions int32
v2 bool
keyCache *keycache.KeyCache
}

// map the last number in the metricname to the partition
Expand All @@ -41,7 +45,7 @@ func (p *LastNumPartitioner) Partition(m schema.PartitionedMetric, numPartitions
return int32(part), nil
}

func New(topic string, brokers []string, codec string, stats met.Backend, partitionScheme string) (*KafkaMdm, error) {
func New(topic string, brokers []string, codec string, timeout time.Duration, stats met.Backend, partitionScheme string, v2 bool) (*KafkaMdm, error) {
// We are looking for strong consistency semantics.
// Because we don't change the flush settings, sarama will try to produce messages
// as fast as possible to keep latency low.
Expand All @@ -52,12 +56,9 @@ func New(topic string, brokers []string, codec string, stats met.Backend, partit
config.Producer.Compression = out.GetCompression(codec)
config.Producer.Partitioner = sarama.NewManualPartitioner

// set all timeouts a bit more aggressive so we can bail out quicker.
// useful for our unit tests, which operate in the orders of seconds anyway
// the defaults of 30s is too long for many of our tests.
config.Net.DialTimeout = 5 * time.Second
config.Net.ReadTimeout = 5 * time.Second
config.Net.WriteTimeout = 5 * time.Second
config.Net.DialTimeout = timeout
config.Net.ReadTimeout = timeout
config.Net.WriteTimeout = timeout
err := config.Validate()
if err != nil {
return nil, err
Expand Down Expand Up @@ -87,19 +88,24 @@ func New(topic string, brokers []string, codec string, stats met.Backend, partit
} else {
part, err = p.NewKafka(partitionScheme)
if err != nil {
return nil, fmt.Errorf("partitionScheme must be one of 'byOrg|bySeries|bySeriesWithTags|lastNum'. got %s", partitionScheme)
return nil, fmt.Errorf("partitionscheme must be one of 'byOrg|bySeries|bySeriesWithTags|bySeriesWithTagsFnv|lastNum'. got %s", partitionScheme)
}
}

return &KafkaMdm{
k := &KafkaMdm{
OutStats: out.NewStats(stats, "kafka-mdm"),
topic: topic,
brokers: brokers,
config: config,
client: producer,
part: part,
numPartitions: int32(len(partitions)),
}, nil
v2: v2,
}
if v2 {
k.keyCache = keycache.NewKeyCache(20*time.Minute, time.Duration(10)*time.Minute)
}
return k, nil
}

func (k *KafkaMdm) Close() error {
Expand All @@ -114,12 +120,37 @@ func (k *KafkaMdm) Flush(metrics []*schema.MetricData) error {
preFlush := time.Now()

k.MessageMetrics.Value(1)
var data []byte

payload := make([]*sarama.ProducerMessage, len(metrics))
var notOk int

for i, metric := range metrics {
data, err := metric.MarshalMsg(data[:])
var data []byte
var err error
if k.v2 {
var mkey schema.MKey
mkey, err = schema.MKeyFromString(metric.Id)
if err != nil {
return err
}
ok := k.keyCache.Touch(mkey, preFlush)
// we've seen this key recently. we can use the optimized format
if ok {
mp := schema.MetricPoint{
MKey: mkey,
Value: metric.Value,
Time: uint32(metric.Time),
}
data = []byte{byte(msg.FormatMetricPoint)}
data, err = mp.Marshal(data)

} else {
notOk++
data, err = metric.MarshalMsg(data[:])
}
} else {
data, err = metric.MarshalMsg(data[:])
}
if err != nil {
return err
}
Expand All @@ -138,6 +169,9 @@ func (k *KafkaMdm) Flush(metrics []*schema.MetricData) error {
}

}
if notOk > 0 {
log.Info(notOk, "metrics could not be sent as v2 MetricPoint")
}
prePub := time.Now()
err := k.client.SendMessages(payload)
if err != nil {
Expand Down
51 changes: 51 additions & 0 deletions stacktest/fakemetrics/out/kafkamdm/keycache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package keycache

import (
"time"

"github.com/grafana/metrictank/schema"
)

// Cache is a single-tenant keycache
// it is sharded for 2 reasons:
// * more granular GC (eg. less latency perceived by caller)
// * mild space savings cause keys are 1 byte shorter
// We shard on the first byte of the metric key, which we assume
// is evenly distributed.
type Cache struct {
shards [256]Shard
}

// NewCache creates a new cache
func NewCache(ref Ref) *Cache {
c := Cache{}
for i := 0; i < 256; i++ {
c.shards[i] = NewShard(ref)
}
return &c
}

// Touch marks the key as seen and returns whether it was seen before
// callers should assure that t >= ref and t-ref <= 42 hours
func (c *Cache) Touch(key schema.Key, t time.Time) bool {
shard := int(key[0])
return c.shards[shard].Touch(key, t)
}

// Len returns the length of the cache
func (c *Cache) Len() int {
var sum int
for i := 0; i < 256; i++ {
sum += c.shards[i].Len()
}
return sum
}

// Prune makes sure all shards are pruned
func (c *Cache) Prune(now time.Time, staleThresh Duration) int {
var remaining int
for i := 0; i < 256; i++ {
remaining += c.shards[i].Prune(now, staleThresh)
}
return remaining
}
103 changes: 103 additions & 0 deletions stacktest/fakemetrics/out/kafkamdm/keycache/keycache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package keycache

import (
"sync"
"time"

"github.com/grafana/metrictank/schema"
)

// KeyCache tracks for all orgs, which keys have been seen, and when was the last time
type KeyCache struct {
staleThresh Duration // number of 10-minutely periods
pruneInterval time.Duration

sync.RWMutex
caches map[uint32]*Cache
}

// NewKeyCache creates a new KeyCache
func NewKeyCache(staleThresh, pruneInterval time.Duration) *KeyCache {
if staleThresh.Hours() > 42 {
panic("stale time may not exceed 42 hours due to resolution of internal bookkeeping")
}
if pruneInterval.Hours() > 42 {
panic("prune interval may not exceed 42 hours due to resolution of internal bookkeeping")
}
if pruneInterval.Minutes() < 10 {
panic("prune interval less than 10 minutes is useless due to resolution of internal bookkeeping")
}
k := &KeyCache{
pruneInterval: pruneInterval,
staleThresh: Duration(int(staleThresh.Seconds()) / 600),
caches: make(map[uint32]*Cache),
}
go k.prune()
return k
}

// Touch marks the key as seen and returns whether it was seen before
// callers should assure that t >= ref and t-ref <= 42 hours
func (k *KeyCache) Touch(key schema.MKey, t time.Time) bool {
k.RLock()
cache, ok := k.caches[key.Org]
k.RUnlock()
// most likely this branch won't execute
if !ok {
k.Lock()
// check again in case another routine has just added it
cache, ok = k.caches[key.Org]
if !ok {
cache = NewCache(NewRef(t))
k.caches[key.Org] = cache
}
k.Unlock()
}
return cache.Touch(key.Key, t)
}

// Len returns the size across all orgs
func (k *KeyCache) Len() int {
var sum int
k.RLock()
caches := make([]*Cache, 0, len(k.caches))
for _, c := range k.caches {
caches = append(caches, c)
}
k.RUnlock()
for _, c := range caches {
sum += c.Len()
}
return sum
}

// prune makes sure each org's cache is pruned
func (k *KeyCache) prune() {
tick := time.NewTicker(k.pruneInterval)
for now := range tick.C {

type target struct {
org uint32
cache *Cache
}

k.RLock()
targets := make([]target, 0, len(k.caches))
for org, c := range k.caches {
targets = append(targets, target{
org,
c,
})
}
k.RUnlock()

for _, t := range targets {
size := t.cache.Prune(now, k.staleThresh)
if size == 0 {
k.Lock()
delete(k.caches, t.org)
k.Unlock()
}
}
}
}
Loading

0 comments on commit c94ca78

Please sign in to comment.