Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add a "Containers" mode to fakemetrics with configurable churn #1859

Merged
merged 2 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions clock/randomticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package clock

import (
"math/rand"
"time"
)

// RandomTicker ticks at dur +- jitter
type RandomTicker struct {
C chan time.Time
stopc chan struct{}
dur time.Duration
jitt time.Duration
blocking bool
}

// NewRandomTicker returns a new RandomTicker containing a channel that will send the time with a period specified by the duration and jitter arguments.
// It adjusts the intervals or drops ticks to make up for slow receivers. The duration and jitter must be greater than zero; if not, NewTicker will panic.
// duration must be >= jitter.
// Stop the ticker to release associated resources.
func NewRandomTicker(dur, jitt time.Duration, blocking bool) *RandomTicker {
rt := RandomTicker{
C: make(chan time.Time),
stopc: make(chan struct{}),
dur: dur,
jitt: jitt,
blocking: blocking,
}
go rt.run()
return &rt
}

// Stop turns off the ticker. After Stop, no more ticks will be sent.
// Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick".
func (rt *RandomTicker) Stop() {
rt.stopc <- struct{}{}
<-rt.stopc
}

func (rt *RandomTicker) run() {
for {
t := time.NewTimer(rt.duration())
select {
case <-rt.stopc:
t.Stop()
close(rt.stopc)
return
case <-t.C:
if rt.blocking {
rt.C <- time.Now()
} else {
select {
case rt.C <- time.Now():
default:
}
}
t.Stop()
}
}
}

func (rt *RandomTicker) duration() time.Duration {
if rt.jitt > 0 {
return rt.dur + time.Duration(rand.Int63n(int64(2*rt.jitt))) - rt.jitt
}
return rt.dur
}
159 changes: 159 additions & 0 deletions cmd/mt-fakemetrics/cmd/containers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright © 2018 Grafana Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"fmt"
"math/rand"
"strconv"
"strings"

"github.com/spf13/cobra"

"time"

"github.com/grafana/metrictank/clock"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/out"
"github.com/grafana/metrictank/cmd/mt-fakemetrics/policy"
"github.com/grafana/metrictank/schema"
log "github.com/sirupsen/logrus"
)

var containersCmd = &cobra.Command{
Use: "containers",
Short: "Mimic a set of containers - with churn - whose stats get reported at the same time",
Run: func(cmd *cobra.Command, args []string) {
checkOutputs()
period = int(periodDur.Seconds())

// since we have so many small little outputs they would each be sending the same data which would be a bit crazy.
initStats(false, "containers")
churnSplit := strings.Split(churn, ":")
if len(churnSplit) != 3 {
log.Fatalf("churnspec: <cycle time>:<cycle time noise>:<pct of containers to cycle>")
}
cycleDur, err := time.ParseDuration(churnSplit[0])
if err != nil {
log.Fatalf("churnspec: <cycle time>:<cycle time noise>:<pct of containers to cycle>")
}
cycleJitter, err := time.ParseDuration(churnSplit[1])
if err != nil {
log.Fatalf("churnspec: <cycle time>:<cycle time noise>:<pct of containers to cycle>")
}
cyclePct, err := strconv.Atoi(churnSplit[2])
if err != nil {
log.Fatalf("churnspec: <cycle time>:<cycle time noise>:<pct of containers to cycle>")
}
if cyclePct < 0 || cyclePct > 100 {
log.Fatalf("churnspec: <cycle time>:<cycle time noise>:<pct of containers to cycle>")
}

rand.Seed(time.Now().UnixNano())

for i := 0; i < numContainers; i++ {
var key [8]byte
rand.Read(key[:])
containers = append(containers, key)
}

vp, err := policy.ParseValuePolicy(valuePolicy)
if err != nil {
panic(err)
}
monitorContainers(cycleDur, cycleJitter, cyclePct, vp)
},
}

var (
containers [][8]byte
numContainers int
numContainersPerBatch int
metricsPerContainer int
churn string
)

func init() {
rootCmd.AddCommand(containersCmd)
containersCmd.Flags().IntVar(&numContainers, "containers", 1000, "how many containers to simulate")
containersCmd.Flags().IntVar(&numContainersPerBatch, "batch", 10, "how many containers's metrics should go into each batch")
containersCmd.Flags().IntVar(&metricsPerContainer, "metrics", 10, "how many metrics per container to simulate")
containersCmd.Flags().DurationVar(&periodDur, "period", 10*time.Second, "period between metric points (must be a multiple of 1s)")
containersCmd.Flags().StringVar(&churn, "churn", "4h:0h:50", "churn spec: <cycle time>:<cycle time noise>:<pct of containers to cycle>")
containersCmd.Flags().StringVar(&valuePolicy, "value-policy", "", "a value policy (i.e. \"single:1\" \"multiple:1,2,3,4,5\" \"timestamp\" \"daily-sine:<peak>,<offset>,<stdev>\")")

}

// containerChurn keeps pct of the existing containers (random selection) and replaces the other ones with new ones
// during this process, containers also gets shuffled
func containerChurn(pct int) {
rand.Shuffle(len(containers), func(i, j int) {
containers[i], containers[j] = containers[j], containers[i]
})
for i := pct * len(containers) / 100; i < len(containers); i++ {
var key [8]byte
rand.Read(key[:])
containers[i] = key
}
}

func containerFlush(now time.Time, vp policy.ValuePolicy, out out.Out) {
var batch []*schema.MetricData
ts := now.Unix()
var numContainers int
for _, id := range containers {
for i := 0; i < metricsPerContainer; i++ {
met := &schema.MetricData{
Name: fmt.Sprintf("mt-fakemetrics.container.%x.metric.%d", id, i),
OrgId: 1,
Interval: period,
Value: vp.Value(ts),
Unit: "ms",
Mtype: "gauge",
Time: ts,
}
met.SetId()
batch = append(batch, met)
}
numContainers++
if numContainers == numContainersPerBatch {
err := out.Flush(batch)
if err != nil {
log.Error(0, err.Error())
}
numContainers = 0
batch = batch[:0]
}
}
if numContainers > 0 {
err := out.Flush(batch)
if err != nil {
log.Error(0, err.Error())
}
}
}

func monitorContainers(cycleDur, cycleJitter time.Duration, cyclePct int, vp policy.ValuePolicy) {
out := getOutput()
tickFlush := clock.AlignedTickLossless(periodDur)
tickChurn := clock.NewRandomTicker(cycleDur, cycleJitter, true)
for {
select {
case t := <-tickFlush:
containerFlush(t, vp, out)
case <-tickChurn.C:
containerChurn(cyclePct)
}
}
}
1 change: 1 addition & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Available Commands:
agginput A particular workload good to test performance of carbon-relay-ng aggregators
backfill backfills old data and stops when 'now' is reached
bad Sends out invalid/out-of-order/duplicate metric data
containers Mimic a set of containers - with churn - whose stats get reported at the same time
feed Publishes a realtime feed of data
help Help about any command
resolutionchange Sends out metric with changing intervals, time range 24hours
Expand Down