Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

notifier: deprecate NSQ ( rip :'( ) + add mt-kafka-persist-sniff tool #1161

Merged
merged 5 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/cmd/mt-index-migrate/mt-index-migrate
/cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
/cmd/mt-schemas-explain/mt-schemas-explain
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-store-cat/mt-store-cat
Expand Down
37 changes: 0 additions & 37 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 0 additions & 12 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ unused-packages = true
name = "github.com/alyu/configparser"
branch = "master"

[[constraint]]
name = "github.com/bitly/go-hostpool"
branch = "master"

[[constraint]]
name = "github.com/davecgh/go-spew"
version = "1.1.0"
Expand Down Expand Up @@ -97,10 +93,6 @@ unused-packages = true
name = "github.com/mitchellh/go-homedir"
branch = "master"

[[constraint]]
name = "github.com/nsqio/go-nsq"
revision = "642a3f9935f12cb3b747294318d730f56f4c34b4"

[[constraint]]
name = "github.com/opentracing/opentracing-go"
version = "^1"
Expand All @@ -117,10 +109,6 @@ unused-packages = true
name = "github.com/raintank/met"
branch = "master"

[[constraint]]
name = "github.com/raintank/misc"
branch = "master"

[[constraint]]
name = "github.com/raintank/schema"
version = "^2"
Expand Down
17 changes: 4 additions & 13 deletions cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/grafana/metrictank/mdata"
"github.com/grafana/metrictank/mdata/cache"
"github.com/grafana/metrictank/mdata/notifierKafka"
"github.com/grafana/metrictank/mdata/notifierNsq"
"github.com/grafana/metrictank/stats"
statsConfig "github.com/grafana/metrictank/stats/config"
bigtableStore "github.com/grafana/metrictank/store/bigtable"
Expand Down Expand Up @@ -112,9 +111,6 @@ func main() {
inKafkaMdm.ConfigSetup()
inPrometheus.ConfigSetup()

// load config for cluster handlers
notifierNsq.ConfigSetup()

// load config for metricIndexers
memory.ConfigSetup()
cassandra.ConfigSetup()
Expand Down Expand Up @@ -184,7 +180,6 @@ func main() {
inKafkaMdm.ConfigProcess(*instance)
memory.ConfigProcess()
inPrometheus.ConfigProcess()
notifierNsq.ConfigProcess()
notifierKafka.ConfigProcess(*instance)
statsConfig.ConfigProcess(*instance)
mdata.ConfigProcess()
Expand Down Expand Up @@ -387,18 +382,14 @@ func main() {
/***********************************
Initialize MetricPersist notifiers
***********************************/
handlers := make([]mdata.NotifierHandler, 0)
var notifiers []mdata.Notifier
if notifierKafka.Enabled {
// The notifierKafka handler will block here until it has processed the backlog of metricPersist messages.
// The notifierKafka notifiers will block here until it has processed the backlog of metricPersist messages.
// it will block for at most kafka-cluster.backlog-process-timeout (default 60s)
handlers = append(handlers, notifierKafka.New(*instance, metrics, metricIndex))
}

if notifierNsq.Enabled {
handlers = append(handlers, notifierNsq.New(*instance, metrics, metricIndex))
notifiers = append(notifiers, notifierKafka.New(*instance, mdata.NewDefaultNotifierHandler(metrics, metricIndex)))
}

mdata.InitPersistNotifier(handlers...)
mdata.InitPersistNotifier(notifiers...)

/***********************************
Start our inputs
Expand Down
38 changes: 38 additions & 0 deletions cmd/mt-kafka-persist-sniff/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"encoding/json"
"fmt"

"github.com/grafana/metrictank/mdata"
"github.com/raintank/schema"
log "github.com/sirupsen/logrus"
)

type PrintNotifierHandler struct{}

func NewPrintNotifierHandler() PrintNotifierHandler {
return PrintNotifierHandler{}
}

func (dn PrintNotifierHandler) PartitionOf(key schema.MKey) (int32, bool) {
return 0, false
}

func (dn PrintNotifierHandler) Handle(data []byte) {
version := uint8(data[0])
if version == uint8(mdata.PersistMessageBatchV1) {
batch := mdata.PersistMessageBatch{}
err := json.Unmarshal(data[1:], &batch)
if err != nil {
log.Errorf("failed to unmarsh batch message: %s -- skipping", err)
return
}
for _, c := range batch.SavedChunks {
fmt.Printf("%s %d %s\n", batch.Instance, c.T0, c.Key)
}
} else {
log.Errorf("unknown message version %d", version)
}
return
}
55 changes: 55 additions & 0 deletions cmd/mt-kafka-persist-sniff/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/mdata/notifierKafka"
"github.com/grafana/metrictank/stats"
log "github.com/sirupsen/logrus"
)

func main() {
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-kafka-persist-sniff")
fmt.Fprintln(os.Stderr, "Print what's flowing through kafka metric persist topic")
fmt.Fprintf(os.Stderr, "\nFlags:\n\n")
flag.PrintDefaults()
notifierKafka.FlagSet.PrintDefaults()
}
formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
instance := "mt-kafka-persist-sniff" + strconv.Itoa(rand.Int())

notifierKafka.FlagSet.Usage = flag.Usage
notifierKafka.FlagSet.Parse(os.Args[1:])
// config may have had it disabled
notifierKafka.Enabled = true

stats.NewDevnull() // make sure metrics don't pile up without getting discarded

notifierKafka.ConfigProcess(instance)

done := make(chan struct{})
go func() {
notifierKafka.New(instance, NewPrintNotifierHandler())
close(done)
}()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

select {
case sig := <-sigChan:
log.Infof("Received signal %q. Shutting down", sig)
case <-done:
}
// notifierKafka.Stop()
}
92 changes: 0 additions & 92 deletions dashboards/extra/fake-metrics.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,98 +535,6 @@
],
"title": "New row"
},
{
"collapse": false,
"editable": true,
"height": "250px",
"panels": [
{
"aliasColors": {
"timeout_count": "#890F02"
},
"bars": false,
"datasource": "graphite",
"editable": true,
"error": false,
"fill": 1,
"grid": {
"threshold1": null,
"threshold1Color": "rgba(216, 200, 27, 0.27)",
"threshold2": null,
"threshold2Color": "rgba(234, 112, 112, 0.22)"
},
"id": 4,
"isNew": true,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "connected",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "depth",
"fill": 3,
"linewidth": 0,
"yaxis": 2
}
],
"span": 12,
"stack": false,
"steppedLine": false,
"targets": [
{
"refId": "A",
"target": "aliasByNode(stats.$environment.nsq.*.topic.metrics.channel.tank.*, 8)"
},
{
"refId": "B",
"target": "aliasByNode(stats.$environment.gauges.nsq.*.topic.metrics.depth, 7)"
}
],
"timeFrom": null,
"timeShift": null,
"title": "NSQ topic/channel",
"tooltip": {
"shared": true,
"value_type": "cumulative",
"msResolution": false
},
"type": "graph",
"yaxes": [
{
"show": true,
"min": null,
"max": null,
"logBase": 1,
"format": "short"
},
{
"show": true,
"min": null,
"max": null,
"logBase": 1,
"format": "short"
}
],
"xaxis": {
"show": true
}
}
],
"title": "New row"
},
{
"collapse": false,
"editable": true,
Expand Down
17 changes: 0 additions & 17 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -332,23 +332,6 @@ offset = newest
# Maximum time backlog processing can block during metrictank startup.
backlog-process-timeout = 60s

### nsq as transport for clustering messages
[nsq-cluster]
enabled = false
# nsqd TCP address (may be given multiple times as comma-separated list)
nsqd-tcp-address =
# lookupd HTTP address (may be given multiple times as comma-separated list)
lookupd-http-address =
topic = metricpersist
channel = tank
# passthrough to nsq.Producer (may be given multiple times as comma-separated list, see http://godoc.org/github.com/nsqio/go-nsq#Config)")
producer-opt =
#passthrough to nsq.Consumer (may be given multiple times as comma-separated list, http://godoc.org/github.com/nsqio/go-nsq#Config)")
consumer-opt =
# max number of messages to allow in flight
max-in-flight = 200


## metric metadata index ##

### in memory, cassandra-backed
Expand Down
Loading