Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1161 from grafana/notifier-stuff
Browse files Browse the repository at this point in the history
notifier: deprecate NSQ ( rip :'( ) + add mt-kafka-persist-sniff tool
  • Loading branch information
Dieterbe authored Dec 7, 2018
2 parents 38d5118 + 048e1d1 commit 61ea62a
Show file tree
Hide file tree
Showing 51 changed files with 174 additions and 5,882 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/cmd/mt-index-migrate/mt-index-migrate
/cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
/cmd/mt-schemas-explain/mt-schemas-explain
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-store-cat/mt-store-cat
Expand Down
37 changes: 0 additions & 37 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 0 additions & 12 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ unused-packages = true
name = "github.com/alyu/configparser"
branch = "master"

[[constraint]]
name = "github.com/bitly/go-hostpool"
branch = "master"

[[constraint]]
name = "github.com/davecgh/go-spew"
version = "1.1.0"
Expand Down Expand Up @@ -97,10 +93,6 @@ unused-packages = true
name = "github.com/mitchellh/go-homedir"
branch = "master"

[[constraint]]
name = "github.com/nsqio/go-nsq"
revision = "642a3f9935f12cb3b747294318d730f56f4c34b4"

[[constraint]]
name = "github.com/opentracing/opentracing-go"
version = "^1"
Expand All @@ -117,10 +109,6 @@ unused-packages = true
name = "github.com/raintank/met"
branch = "master"

[[constraint]]
name = "github.com/raintank/misc"
branch = "master"

[[constraint]]
name = "github.com/raintank/schema"
version = "^2"
Expand Down
17 changes: 4 additions & 13 deletions cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/notifierKafka"
"github.com/grafana/metrictank/mdata/notifierNsq"
"github.com/grafana/metrictank/stats"
statsConfig "github.com/grafana/metrictank/stats/config"
bigtableStore "github.com/grafana/metrictank/store/bigtable"
Expand Down Expand Up @@ -112,9 +111,6 @@ func main() {
inKafkaMdm.ConfigSetup()
inPrometheus.ConfigSetup()

// load config for cluster handlers
notifierNsq.ConfigSetup()

// load config for metricIndexers
memory.ConfigSetup()
cassandra.ConfigSetup()
Expand Down Expand Up @@ -184,7 +180,6 @@ func main() {
inKafkaMdm.ConfigProcess(*instance)
memory.ConfigProcess()
inPrometheus.ConfigProcess()
notifierNsq.ConfigProcess()
notifierKafka.ConfigProcess(*instance)
statsConfig.ConfigProcess(*instance)
mdata.ConfigProcess()
Expand Down Expand Up @@ -387,18 +382,14 @@ func main() {
/***********************************
Initialize MetricPersist notifiers
***********************************/
handlers := make([]mdata.NotifierHandler, 0)
var notifiers []mdata.Notifier
if notifierKafka.Enabled {
// The notifierKafka handler will block here until it has processed the backlog of metricPersist messages.
// The notifierKafka notifiers will block here until it has processed the backlog of metricPersist messages.
// it will block for at most kafka-cluster.backlog-process-timeout (default 60s)
handlers = append(handlers, notifierKafka.New(*instance, metrics, metricIndex))
}

if notifierNsq.Enabled {
handlers = append(handlers, notifierNsq.New(*instance, metrics, metricIndex))
notifiers = append(notifiers, notifierKafka.New(*instance, mdata.NewDefaultNotifierHandler(metrics, metricIndex)))
}

mdata.InitPersistNotifier(handlers...)
mdata.InitPersistNotifier(notifiers...)

/***********************************
Start our inputs
Expand Down
38 changes: 38 additions & 0 deletions cmd/mt-kafka-persist-sniff/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"encoding/json"
"fmt"

"github.com/grafana/metrictank/mdata"
"github.com/raintank/schema"
log "github.com/sirupsen/logrus"
)

type PrintNotifierHandler struct{}

func NewPrintNotifierHandler() PrintNotifierHandler {
return PrintNotifierHandler{}
}

func (dn PrintNotifierHandler) PartitionOf(key schema.MKey) (int32, bool) {
return 0, false
}

func (dn PrintNotifierHandler) Handle(data []byte) {
version := uint8(data[0])
if version == uint8(mdata.PersistMessageBatchV1) {
batch := mdata.PersistMessageBatch{}
err := json.Unmarshal(data[1:], &batch)
if err != nil {
log.Errorf("failed to unmarsh batch message: %s -- skipping", err)
return
}
for _, c := range batch.SavedChunks {
fmt.Printf("%s %d %s\n", batch.Instance, c.T0, c.Key)
}
} else {
log.Errorf("unknown message version %d", version)
}
return
}
55 changes: 55 additions & 0 deletions cmd/mt-kafka-persist-sniff/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/mdata/notifierKafka"
"github.com/grafana/metrictank/stats"
log "github.com/sirupsen/logrus"
)

func main() {
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-kafka-persist-sniff")
fmt.Fprintln(os.Stderr, "Print what's flowing through kafka metric persist topic")
fmt.Fprintf(os.Stderr, "\nFlags:\n\n")
flag.PrintDefaults()
notifierKafka.FlagSet.PrintDefaults()
}
formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
instance := "mt-kafka-persist-sniff" + strconv.Itoa(rand.Int())

notifierKafka.FlagSet.Usage = flag.Usage
notifierKafka.FlagSet.Parse(os.Args[1:])
// config may have had it disabled
notifierKafka.Enabled = true

stats.NewDevnull() // make sure metrics don't pile up without getting discarded

notifierKafka.ConfigProcess(instance)

done := make(chan struct{})
go func() {
notifierKafka.New(instance, NewPrintNotifierHandler())
close(done)
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

select {
case sig := <-sigChan:
log.Infof("Received signal %q. Shutting down", sig)
case <-done:
}
// notifierKafka.Stop()
}
92 changes: 0 additions & 92 deletions dashboards/extra/fake-metrics.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,98 +535,6 @@
],
"title": "New row"
},
{
"collapse": false,
"editable": true,
"height": "250px",
"panels": [
{
"aliasColors": {
"timeout_count": "#890F02"
},
"bars": false,
"datasource": "graphite",
"editable": true,
"error": false,
"fill": 1,
"grid": {
"threshold1": null,
"threshold1Color": "rgba(216, 200, 27, 0.27)",
"threshold2": null,
"threshold2Color": "rgba(234, 112, 112, 0.22)"
},
"id": 4,
"isNew": true,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "connected",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "depth",
"fill": 3,
"linewidth": 0,
"yaxis": 2
}
],
"span": 12,
"stack": false,
"steppedLine": false,
"targets": [
{
"refId": "A",
"target": "aliasByNode(stats.$environment.nsq.*.topic.metrics.channel.tank.*, 8)"
},
{
"refId": "B",
"target": "aliasByNode(stats.$environment.gauges.nsq.*.topic.metrics.depth, 7)"
}
],
"timeFrom": null,
"timeShift": null,
"title": "NSQ topic/channel",
"tooltip": {
"shared": true,
"value_type": "cumulative",
"msResolution": false
},
"type": "graph",
"yaxes": [
{
"show": true,
"min": null,
"max": null,
"logBase": 1,
"format": "short"
},
{
"show": true,
"min": null,
"max": null,
"logBase": 1,
"format": "short"
}
],
"xaxis": {
"show": true
}
}
],
"title": "New row"
},
{
"collapse": false,
"editable": true,
Expand Down
17 changes: 0 additions & 17 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -332,23 +332,6 @@ offset = newest
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s

### nsq as transport for clustering messages
[nsq-cluster]
enabled = false
# nsqd TCP address (may be given multiple times as comma-separated list)
nsqd-tcp-address =
# lookupd HTTP address (may be given multiple times as comma-separated list)
lookupd-http-address =
topic = metricpersist
channel = tank
# passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
producer-opt =
#passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)")
consumer-opt =
# max number of messages to allow in flight
max-in-flight = 200


## metric metadata index ##

### in memory, cassandra-backed
Expand Down
Loading

0 comments on commit 61ea62a

Please sign in to comment.