Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add values policy to mt-fakemetrics #1773

Merged
merged 7 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions cmd/mt-fakemetrics/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## fakemetrics
# Fakemetrics

fakemetrics generates a metrics workload with fake data, that you can feed into kafka, carbon, etc.
Fakemetrics generates a metrics workload with fake data, that you can feed into kafka, carbon, etc.

## Example invocations

Expand Down Expand Up @@ -42,14 +42,58 @@ The speed is 100x what it would be if it were realtime (so a rate of 4x100x100=4
mt-fakemetrics backfill --kafka-mdm-addr localhost:9092 --offset 5h --period 10s --speedup 100 --orgs 4 --mpo 100
```

# Outputs
## Value Policies

If no Value Policy is specified a random float will be generated

You can specify a Value Policy with the `--value-policy` flag. Currently the following Value Policies are supported:

* single
* multiple
* timestamp

This flag can be useful when it is important to know what data fakemetrics generates to verify correctness of the data returned

### Single

This policy allows you to set one single value that all points will use. This makes it very easy to identify potential errors in your configuration or deployment.

Example:
```
mt-fakemetrics backfill --kafka-mdm-addr localhost:9092 --offset 5h --period 10s --speedup 100 --orgs 1 --mpo 100 --value-policy single:2.5
```

Note that there are no spaces in the policy setting itself.

### Multiple

The `multiple` policy allows you to pass in a list of values which will be iterated over. Upon reaching the end of your list it will start over at the beginning.

Example:
```
mt-fakemetrics backfill --kafka-mdm-addr localhost:9092 --offset 5h --period 10s --speedup 100 --orgs 1 --mpo 100 --value-policy multiple:1,2.152,3.14,4,5
```

Note that there are no spaces in the policy setting itself, and it uses commas to separate each value.

### Timestamp

This policy sets the value of each point to its timestamp.

```
mt-fakemetrics backfill --kafka-mdm-addr localhost:9092 --offset 5h --period 10s --speedup 100 --orgs 1 --mpo 100 --value-policy timestamp
```

Do not use a colon (':') with this policy, as you can't specify a value with this.

## Outputs

kafka and stdout are multi-tenant outputs where structured data is sent and multiple orgs may have the same key in their own namespace.
carbon and gnet (short for grafana.net or more specifically the [tsdb-gw](https://github.com/raintank/tsdb-gw) service) are single-tenant.
so in that case you can only simulate one org otherwise the keys would overwrite each other.
for the gnet output, the org-id will be set to whatever you authenticate as (unless you use the admin key),

# Important
## Important

we use ticker based loops in which we increment timestamps and call output Flush methods.
if a loop iteration takes too long (due to an output's Flush taking too long for example),
Expand Down
10 changes: 9 additions & 1 deletion cmd/mt-fakemetrics/cmd/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd
import (
"time"

"github.com/grafana/metrictank/cmd/mt-fakemetrics/policy"
"github.com/spf13/cobra"
)

Expand All @@ -28,7 +29,13 @@ var backfillCmd = &cobra.Command{
period = int(periodDur.Seconds())
flush = int(flushDur.Nanoseconds() / 1000 / 1000)
outs := getOutputs()
dataFeed(outs, orgs, mpo, period, flush, int(offset.Seconds()), speedup, true, TaggedBuilder{metricName})

vp, err := policy.ParseValuePolicy(valuePolicy)
if err != nil {
panic(err)
}

dataFeed(outs, orgs, mpo, period, flush, int(offset.Seconds()), speedup, true, TaggedBuilder{metricName}, vp)
},
}

Expand All @@ -41,4 +48,5 @@ func init() {
backfillCmd.Flags().IntVar(&speedup, "speedup", 1, "for each advancement of real time, how many advancements of fake data to simulate")
backfillCmd.Flags().DurationVar(&flushDur, "flush", time.Second, "how often to flush metrics")
backfillCmd.Flags().DurationVar(&periodDur, "period", time.Second, "period between metric points (must be a multiple of 1s)")
backfillCmd.Flags().StringVar(&valuePolicy, "value-policy", "", "a value policy (i.e. \"single:1\" \"multiple:1,2,3,4,5\" \"timestamp\")")
}
7 changes: 4 additions & 3 deletions cmd/mt-fakemetrics/cmd/datafeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package cmd

import (
"fmt"
"math/rand"
"strconv"
"time"

"github.com/grafana/metrictank/cmd/mt-fakemetrics/out"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/policy"
"github.com/grafana/metrictank/schema"
"github.com/raintank/worldping-api/pkg/log"
)
Expand Down Expand Up @@ -136,7 +136,7 @@ func (tb TaggedBuilder) Build(orgs, mpo, period int) [][]schema.MetricData {
// period in seconds
// flush in ms
// offset in seconds
func dataFeed(outs []out.Out, orgs, mpo, period, flush, offset, speedup int, stopAtNow bool, builder MetricPayloadBuilder) {
func dataFeed(outs []out.Out, orgs, mpo, period, flush, offset, speedup int, stopAtNow bool, builder MetricPayloadBuilder, vp policy.ValuePolicy) {
flushDur := time.Duration(flush) * time.Millisecond

if mpo*speedup%period != 0 {
Expand Down Expand Up @@ -213,7 +213,8 @@ times %4d orgs: each %s, flushing %d metrics so rate of %d Hz. (%d total unique
ts += mp
}
metricData.Time = ts
metricData.Value = rand.Float64() * float64(m+1)
metricData.Value = vp.Value(ts)

data = append(data, &metricData)
}
startFrom = (m + 1) % mpo
Expand Down
10 changes: 9 additions & 1 deletion cmd/mt-fakemetrics/cmd/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cmd
import (
"time"

"github.com/grafana/metrictank/cmd/mt-fakemetrics/policy"
"github.com/spf13/cobra"
)

Expand All @@ -28,7 +29,13 @@ var feedCmd = &cobra.Command{
period = int(periodDur.Seconds())
flush = int(flushDur.Nanoseconds() / 1000 / 1000)
outs := getOutputs()
dataFeed(outs, orgs, mpo, period, flush, 0, 1, false, TaggedBuilder{metricName})

vp, err := policy.ParseValuePolicy(valuePolicy)
if err != nil {
panic(err)
}

dataFeed(outs, orgs, mpo, period, flush, 0, 1, false, TaggedBuilder{metricName}, vp)

},
}
Expand All @@ -40,4 +47,5 @@ func init() {
feedCmd.Flags().IntVar(&mpo, "mpo", 100, "how many metrics per org to simulate")
feedCmd.Flags().DurationVar(&flushDur, "flush", time.Second, "how often to flush metrics")
feedCmd.Flags().DurationVar(&periodDur, "period", time.Second, "period between metric points (must be a multiple of 1s)")
feedCmd.Flags().StringVar(&valuePolicy, "value-policy", "", "a value policy (i.e. \"single:1\" \"multiple:1,2,3,4,5\" \"timestamp\")")
}
2 changes: 2 additions & 0 deletions cmd/mt-fakemetrics/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ var (
customTags []string
numUniqueCustomTags int

valuePolicy string

kafkaMdmAddr string
kafkaMdmTopic string
kafkaMdmV2 bool
Expand Down
8 changes: 7 additions & 1 deletion cmd/mt-fakemetrics/cmd/schemasbackfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"time"

"github.com/grafana/metrictank/cmd/mt-fakemetrics/policy"
"github.com/grafana/metrictank/conf"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -38,6 +39,7 @@ func init() {
schemasbackfillCmd.Flags().IntVar(&speedup, "speedup", 1, "for each advancement of real time, how many advancements of fake data to simulate")
schemasbackfillCmd.Flags().DurationVar(&flushDur, "flush", time.Second, "how often to flush metrics")
schemasbackfillCmd.Flags().DurationVar(&periodDur, "period", time.Second, "period between metric points (must be a multiple of 1s)")
schemasbackfillCmd.Flags().StringVar(&valuePolicy, "value-policy", "", "a value policy (i.e. \"single:1\" \"multiple:1,2,3,4,5\" \"timestamp\")")
}

// schemasbackfillCmd represents the schemasbackfill command
Expand Down Expand Up @@ -77,7 +79,11 @@ var schemasbackfillCmd = &cobra.Command{
name = "default"
}
go func(name string, period int) {
dataFeed(outs, 1, mpr, period, flush, int(offset.Seconds()), speedup, true, SimpleBuilder{name})
vp, err := policy.ParseValuePolicy(valuePolicy)
if err != nil {
panic(err)
}
dataFeed(outs, 1, mpr, period, flush, int(offset.Seconds()), speedup, true, SimpleBuilder{name}, vp)
wg.Done()
}(name, schema.Retentions.Rets[0].SecondsPerPoint)
}
Expand Down
109 changes: 109 additions & 0 deletions cmd/mt-fakemetrics/policy/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package policy

import (
"fmt"
"math/rand"
"strconv"
"strings"
)

type ValuePolicy interface {
Value(ts int64) float64
}

type ValuePolicyRandom struct {
}

func (v *ValuePolicyRandom) Value(ts int64) float64 {
return rand.Float64() * float64(rand.Int63n(10))
}

type ValuePolicyTimestamp struct {
}

func (v *ValuePolicyTimestamp) Value(ts int64) float64 {
return float64(ts)
}

type ValuePolicySingle struct {
val float64
}

func (v *ValuePolicySingle) Value(ts int64) float64 {
return v.val
}

type ValuePolicyMultiple struct {
lastTs int64
idx int
vals []float64
}

func (v *ValuePolicyMultiple) Value(ts int64) float64 {
if v.lastTs == 0 {
v.lastTs = ts
}

if ts == v.lastTs {
return v.vals[v.idx]
}

v.lastTs = ts
v.idx++
if v.idx >= len(v.vals) {
v.idx = 0
}
return v.vals[v.idx]
}

func ParseValuePolicy(p string) (ValuePolicy, error) {
if p == "" {
return &ValuePolicyRandom{}, nil
}

if strings.TrimSpace(p) == "timestamp" {
return &ValuePolicyTimestamp{}, nil
}

split := strings.Index(p, ":")
if split == -1 {
return nil, fmt.Errorf("error parsing Values Policy - separator (':') not found: %s\nMake sure you don't have any spaces in your value-policy argument", p)
}

switch strings.TrimSpace(p[:split]) {
case "single":
val, err := strconv.ParseFloat(strings.TrimSpace(p[split+1:]), 64)
if err != nil {
return nil, fmt.Errorf("could not parse value: %s", p[split+1:])
}
return &ValuePolicySingle{
val: val,
}, nil
case "multiple":
vals, err := parseValues(strings.TrimSpace(p[split+1:]))
if err != nil {
return nil, err
}
if len(vals) < 2 {
return nil, fmt.Errorf("'multiple' Values Policy used, but less than 2 values were specified. Maybe you wanted to use 'single'?")
}
return &ValuePolicyMultiple{
vals: vals,
}, nil
default:
return nil, fmt.Errorf("error parsing Values Policy: %s", p)
}
}

func parseValues(v string) ([]float64, error) {
var vals []float64
for _, s := range strings.Split(v, ",") {
if n, err := strconv.ParseFloat(strings.TrimSpace(s), 64); err == nil {
vals = append(vals, n)
} else {
return vals, fmt.Errorf("could not parse value: %s", s)
}
}

return vals, nil
}