Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add mt-kafka-persist-sniff tool
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Dec 4, 2018
1 parent 3106f88 commit f420a43
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/cmd/mt-index-migrate/mt-index-migrate
/cmd/mt-kafka-mdm-sniff-out-of-order/mt-kafka-mdm-sniff-out-of-order
/cmd/mt-kafka-mdm-sniff/mt-kafka-mdm-sniff
/cmd/mt-kafka-persist-sniff/mt-kafka-persist-sniff
/cmd/mt-schemas-explain/mt-schemas-explain
/cmd/mt-split-metrics-by-ttl/mt-split-metrics-by-ttl
/cmd/mt-store-cat/mt-store-cat
Expand Down
38 changes: 38 additions & 0 deletions cmd/mt-kafka-persist-sniff/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"encoding/json"
"fmt"

"github.com/grafana/metrictank/mdata"
"github.com/raintank/schema"
log "github.com/sirupsen/logrus"
)

type PrintNotifierHandler struct{}

func NewPrintNotifierHandler() PrintNotifierHandler {
return PrintNotifierHandler{}
}

func (dn PrintNotifierHandler) PartitionOf(key schema.MKey) (int32, bool) {
return 0, false
}

func (dn PrintNotifierHandler) Handle(data []byte) {
version := uint8(data[0])
if version == uint8(mdata.PersistMessageBatchV1) {
batch := mdata.PersistMessageBatch{}
err := json.Unmarshal(data[1:], &batch)
if err != nil {
log.Errorf("failed to unmarsh batch message: %s -- skipping", err)
return
}
for _, c := range batch.SavedChunks {
fmt.Printf("%s %d %s\n", batch.Instance, c.T0, c.Key)
}
} else {
log.Errorf("unknown message version %d", version)
}
return
}
62 changes: 62 additions & 0 deletions cmd/mt-kafka-persist-sniff/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"flag"
"fmt"
"math/rand"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/mdata/notifierKafka"
"github.com/grafana/metrictank/stats"
"github.com/rakyll/globalconf"
log "github.com/sirupsen/logrus"
)

var confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")

func main() {
flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-kafka-persist-sniff")
fmt.Fprintln(os.Stderr, "Print what's flowing through kafka metric persist topic")
fmt.Fprintf(os.Stderr, "\nFlags:\n\n")
flag.PrintDefaults()
}
flag.Parse()
formatter := &logger.TextFormatter{}
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
instance := "mt-kafka-persist-sniff" + strconv.Itoa(rand.Int())

// Only try and parse the conf file if it exists
path := ""
if _, err := os.Stat(*confFile); err == nil {
path = *confFile
}
conf, err := globalconf.NewWithOptions(&globalconf.Options{
Filename: path,
EnvPrefix: "MT_",
})
if err != nil {
log.Fatalf("error with configuration file: %s", err.Error())
os.Exit(1)
}
conf.ParseAll()

// config may have had it disabled
notifierKafka.Enabled = true

stats.NewDevnull() // make sure metrics don't pile up without getting discarded

notifierKafka.ConfigProcess(instance)
notifierKafka.New(instance, NewPrintNotifierHandler())
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigChan
log.Infof("Received signal %q. Shutting down", sig)
// notifierKafka.Stop()
}
13 changes: 13 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,19 @@ Flags:
```


## mt-kafka-persist-sniff

```
mt-kafka-persist-sniff
Print what's flowing through kafka metric persist topic
Flags:
-config string
configuration file path (default "/etc/metrictank/metrictank.ini")
```


## mt-schemas-explain

```
Expand Down
1 change: 1 addition & 0 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func New(instance string, handler mdata.NotifierHandler) *NotifierKafka {
instance: instance,
in: make(chan mdata.SavedChunk),
bPool: util.NewBufferPool(),
handler: handler,
client: client,
consumer: consumer,
producer: producer,
Expand Down

0 comments on commit f420a43

Please sign in to comment.