Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1147 from grafana/mt-store-cat-improvements
Browse files Browse the repository at this point in the history
mt-store-cat improvements: glob filter, chunk-csv output
  • Loading branch information
Dieterbe authored Nov 23, 2018
2 parents 2cd05ea + d174fbe commit 6aa4eaa
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 102 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions cmd/mt-store-cat/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package main

import (
"context"
"fmt"
"time"

"github.com/grafana/metrictank/store/cassandra"
log "github.com/sirupsen/logrus"
)

// printChunkSummary prints a summary of chunks in the store matching the given conditions, grouped in buckets of groupTTL size by their TTL
func printChunkSummary(ctx context.Context, store *cassandra.CassandraStore, tables []cassandra.Table, metrics []Metric, groupTTL string) error {
now := uint32(time.Now().Unix())
end_month := now - (now % cassandra.Month_sec)

for _, tbl := range tables {
// per store.FindExistingTables(), actual TTL may be up to 2x what's in tablename.
// we query up to 4x so that we also include data that should have been dropped already but still sticks around for whatever reason.
start := now - uint32(4*tbl.TTL*3600)
start_month := start - (start % cassandra.Month_sec)
fmt.Println("## Table", tbl.Name)
if len(metrics) == 0 {
query := fmt.Sprintf("select key, ttl(data) from %s", tbl.Name)
iter := store.Session.Query(query).Iter()
showKeyTTL(iter, groupTTL)
} else {
for _, metric := range metrics {
for month := start_month; month <= end_month; month += cassandra.Month_sec {
row_key := fmt.Sprintf("%s_%d", metric.AMKey.String(), month/cassandra.Month_sec)
query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl.Name)
iter := store.Session.Query(query, row_key).Iter()
showKeyTTL(iter, groupTTL)
}
}
}
}
return nil
}

func printChunkCsv(ctx context.Context, store *cassandra.CassandraStore, table cassandra.Table, metrics []Metric, start, end uint32) {

// see CassandraStore.SearchTable for more information
start_month := start - (start % cassandra.Month_sec) // starting row has to be at, or before, requested start
end_month := (end - 1) - ((end - 1) % cassandra.Month_sec) // ending row has to include the last point we might need (end-1)
startMonthNum := start_month / cassandra.Month_sec
endMonthNum := end_month / cassandra.Month_sec
rowKeys := make([]string, endMonthNum-startMonthNum+1)

query := fmt.Sprintf("SELECT key, ts, data FROM %s WHERE key IN ? AND ts < ?", table.Name)

for _, metric := range metrics {
i := 0
for num := startMonthNum; num <= endMonthNum; num += 1 {
rowKeys[i] = fmt.Sprintf("%s_%d", metric.AMKey, num)
i++
}
params := []interface{}{rowKeys, end}
iter := store.Session.Query(query, params...).WithContext(ctx).Iter()
var key string
var ts int
var b []byte
for iter.Scan(&key, &ts, &b) {
fmt.Printf("%s,%d,0x%x\n", key, ts, b)
}
err := iter.Close()
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
log.Fatal("query was aborted")
}
log.Fatalf("query failure: %v", err)
}
}
}
File renamed without changes.
37 changes: 0 additions & 37 deletions cmd/mt-store-cat/full.go

This file was deleted.

94 changes: 68 additions & 26 deletions cmd/mt-store-cat/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ var (
printTs = flag.Bool("print-ts", false, "print time stamps instead of formatted dates. only for points and point-summary format")
groupTTL = flag.String("groupTTL", "d", "group chunks in TTL buckets: s (second. means unbucketed), m (minute), h (hour) or d (day). only for chunk-summary format")
timeZoneStr = flag.String("time-zone", "local", "time-zone to use for interpreting from/to when needed. (check your config)")
archiveStr = flag.String("archive", "", "archive to fetch for given metric. e.g. 'sum_1800'")
verbose bool

printTime func(ts uint32) string
)
Expand All @@ -46,6 +48,7 @@ func init() {
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
log.SetFormatter(formatter)
log.SetLevel(log.InfoLevel)
flag.BoolVar(&verbose, "verbose", false, "verbose (print stuff about the request)")
}

func main() {
Expand Down Expand Up @@ -85,11 +88,12 @@ func main() {
fmt.Println()
fmt.Printf(" mt-store-cat [flags] <table-selector> <metric-selector> <format>\n")
fmt.Printf(" table-selector: '*' or name of a table. e.g. 'metric_128'\n")
fmt.Printf(" metric-selector: '*' or an id (of raw or aggregated series) or prefix:<prefix>\n")
fmt.Printf(" metric-selector: '*' or an id (of raw or aggregated series) or prefix:<prefix> or substr:<substring> or glob:<pattern>\n")
fmt.Printf(" format:\n")
fmt.Printf(" - points\n")
fmt.Printf(" - point-summary\n")
fmt.Printf(" - chunk-summary (shows TTL's, optionally bucketed. See groupTTL flag)\n")
fmt.Printf(" - chunk-csv (for importing into cassandra)\n")
fmt.Println()
fmt.Println("EXAMPLES:")
fmt.Println("mt-store-cat -cassandra-keyspace metrictank -from='-1min' '*' '1.77c8c77afa22b67ef5b700c2a2b88d5f' points")
Expand Down Expand Up @@ -125,14 +129,13 @@ func main() {
}
metricSelector = flag.Arg(1)
format = flag.Arg(2)
if format != "points" && format != "point-summary" && format != "chunk-summary" {
if format != "points" && format != "point-summary" && format != "chunk-summary" && format != "chunk-csv" {
flag.Usage()
os.Exit(-1)
}
if metricSelector == "prefix:" {
log.Fatal("prefix cannot be empty")
if metricSelector == "prefix:" || metricSelector == "substr:" || metricSelector == "glob:" {
log.Fatal("prefix/substr/glob cannot be empty")
}

}

// Only try and parse the conf file if it exists
Expand Down Expand Up @@ -194,24 +197,39 @@ func main() {
log.Fatalf("failed to read tables from cassandra. %s", err.Error())
}

if tableSelector == "tables" {
tables, err := getTables(store, "")
var archive schema.Archive
if *archiveStr != "" {
archive, err = schema.ArchiveFromString(*archiveStr)
if err != nil {
log.Fatal(err.Error())
}
for _, table := range tables {
fmt.Printf("%s (%d hours <= ttl < %d hours)\n", table.Name, table.TTL, table.TTL*2)
log.Fatalf("could not parse archive %q: %s", *archiveStr, err)
}
}

// set up is done, now actually execute the business logic

// handle "tables"
if tableSelector == "tables" {
printTables(store)
return
}

// handle the case where we have a table-selector, metric-selector and format
// table-selector: '*' or name of a table. e.g. 'metric_128'
// metric-selector: '*' or an id (of raw or aggregated series) or prefix:<prefix> or substr:<substring> or glob:<pattern>
// format: points, point-summary, chunk-summary or chunk-csv

if format == "chunk-csv" && (tableSelector == "*" || tableSelector == "") {
log.Fatal("chunk-csv format can be used with 1 cassandra table only")
}

tables, err := getTables(store, tableSelector)
if err != nil {
log.Fatal(err.Error())
}

var fromUnix, toUnix uint32

if format == "points" || format == "point-summary" {
if format == "points" || format == "point-summary" || format == "chunk-csv" {
now := time.Now()
defaultFrom := uint32(now.Add(-time.Duration(24) * time.Hour).Unix())
defaultTo := uint32(now.Add(time.Duration(1) * time.Second).Unix())
Expand All @@ -228,25 +246,41 @@ func main() {
}
var metrics []Metric
if metricSelector == "*" {
fmt.Println("# Looking for ALL metrics")
if verbose {
fmt.Println("# Looking for ALL metrics")
}
// chunk-summary doesn't need an explicit listing. it knows if metrics is empty, to query all
// but the other two do need an explicit listing.
if format == "points" || format == "point-summary" {
metrics, err = getMetrics(store, "")
metrics, err = getMetrics(store, "", "", "", archive)
if err != nil {
log.Errorf("cassandra query error. %s", err.Error())
return
}
}
} else if strings.HasPrefix(metricSelector, "prefix:") {
fmt.Println("# Looking for these metrics:")
metrics, err = getMetrics(store, strings.Replace(metricSelector, "prefix:", "", 1))
} else if strings.HasPrefix(metricSelector, "prefix:") || strings.HasPrefix(metricSelector, "substr:") || strings.HasPrefix(metricSelector, "glob:") {
var prefix, substr, glob string
if strings.HasPrefix(metricSelector, "prefix:") {
prefix = strings.Replace(metricSelector, "prefix:", "", 1)
}
if strings.HasPrefix(metricSelector, "substr:") {
substr = strings.Replace(metricSelector, "substr:", "", 1)
}
if strings.HasPrefix(metricSelector, "glob:") {
glob = strings.Replace(metricSelector, "glob:", "", 1)
}
if verbose {
fmt.Println("# Looking for these metrics:")
}
metrics, err = getMetrics(store, prefix, substr, glob, archive)
if err != nil {
log.Errorf("cassandra query error. %s", err.Error())
return
}
for _, m := range metrics {
fmt.Println(m)
if verbose {
for _, m := range metrics {
fmt.Println(m)
}
}
} else {
amkey, err := schema.AMKeyFromString(metricSelector)
Expand All @@ -255,7 +289,9 @@ func main() {
return
}

fmt.Println("# Looking for this metric:")
if verbose {
fmt.Println("# Looking for this metric:")
}

metrics, err = getMetric(store, amkey)
if err != nil {
Expand All @@ -266,22 +302,28 @@ func main() {
fmt.Printf("metric id %v not found", amkey.MKey)
return
}
for _, m := range metrics {
fmt.Println(m)
if verbose {
for _, m := range metrics {
fmt.Println(m)
}
}
}

fmt.Printf("# Keyspace %q:\n", storeConfig.Keyspace)
if verbose {
fmt.Printf("# Keyspace %q:\n", storeConfig.Keyspace)
}

span := tracer.StartSpan("mt-store-cat " + format)
ctx := opentracing.ContextWithSpan(context.Background(), span)

switch format {
case "points":
points(ctx, store, tables, metrics, fromUnix, toUnix, uint32(*fix))
printPoints(ctx, store, tables, metrics, fromUnix, toUnix, uint32(*fix))
case "point-summary":
pointSummary(ctx, store, tables, metrics, fromUnix, toUnix, uint32(*fix))
printPointSummary(ctx, store, tables, metrics, fromUnix, toUnix, uint32(*fix))
case "chunk-summary":
chunkSummary(ctx, store, tables, metrics, *groupTTL)
printChunkSummary(ctx, store, tables, metrics, *groupTTL)
case "chunk-csv":
printChunkCsv(ctx, store, tables[0], metrics, fromUnix, toUnix)
}
}
50 changes: 50 additions & 0 deletions cmd/mt-store-cat/match_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import "testing"

type matchCase struct {
name string
match bool
}

func TestMatchSimpleAsterisk(t *testing.T) {
patt := "foo.*.bar"
matchCases := []matchCase{
{"foo", false},
{"bar", false},
{"foo.bar", false},
{"foo.abc.bar", true},
{"fooa.abc.bar", false},
{"foo.abc.cbar", false},
{"foo.abc.def.bar", false},
}
testmatch(patt, matchCases, t)
}

func TestMatchDoubleAsterisk(t *testing.T) {
patt := "foo.*.bar.*"
matchCases := []matchCase{
{"foo", false},
{"bar", false},
{"foo.bar", false},
{"foo.abc.bar", false},
{"foo.bar.baz", false},
{"foo.abc.bar.a", true},
{"fooa.abc.bar.a", false},
{"foo.abc.cbar.a", false},
{"foo.abc.def.bar.a", false},
}
testmatch(patt, matchCases, t)
}

func testmatch(patt string, matchCases []matchCase, t *testing.T) {
for _, mc := range matchCases {
metric := Metric{
name: mc.name,
}
ok := match("", "", patt, metric)
if ok != mc.match {
t.Fatalf("case match('','',%q, Metric{name:%q}) -> expected %t, got %t", patt, mc.name, mc.match, ok)
}
}
}
Loading

0 comments on commit 6aa4eaa

Please sign in to comment.