Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1173 from grafana/mt-update-ttl-updates
Browse files Browse the repository at this point in the history
mt-update-ttl : use standard store, specify TTL's not tables, auto-create tables + misc
  • Loading branch information
Dieterbe authored Dec 11, 2018
2 parents fb1478e + 5f52ccb commit 51042d1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 130 deletions.
179 changes: 74 additions & 105 deletions cmd/mt-update-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,27 @@ import (
"fmt"
"math"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gocql/gocql"
"github.com/grafana/metrictank/logger"
"github.com/grafana/metrictank/stats"
"github.com/grafana/metrictank/store/cassandra"
hostpool "github.com/hailocab/go-hostpool"
"github.com/raintank/dur"
log "github.com/sirupsen/logrus"
)

const maxToken = math.MaxInt64 // 9223372036854775807

var (
cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)")
cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table")
cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
cassandraTimeout = flag.String("cassandra-timeout", "1s", "cassandra timeout")
cassandraConcurrency = flag.Int("cassandra-concurrency", 20, "max number of concurrent reads to cassandra.")
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")

cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
cassandraHostVerification = flag.Bool("cassandra-host-verification", true, "host (hostname and server cert) verification when using SSL")

cassandraAuth = flag.Bool("cassandra-auth", false, "enable cassandra authentication")
cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication")
cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication")

cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")

startTs = flag.Int("start-timestamp", 0, "timestamp at which to start, defaults to 0")
endTs = flag.Int("end-timestamp", math.MaxInt32, "timestamp at which to stop, defaults to int max")
numThreads = flag.Int("threads", 10, "number of workers to use to process data")

verbose = flag.Bool("verbose", false, "show every record being processed")

doneKeys uint64
doneRows uint64
doneKeys uint64
doneRows uint64
startTs int
endTs int
numThreads int
statusEvery int
verbose bool
)

func init() {
Expand All @@ -58,89 +36,71 @@ func init() {
}

func main() {
cfg := cassandra.CliConfig
flag.StringVar(&cfg.Addrs, "cassandra-addrs", cfg.Addrs, "cassandra host (may be given multiple times as comma-separated list)")
flag.StringVar(&cfg.Keyspace, "cassandra-keyspace", cfg.Keyspace, "cassandra keyspace to use for storing the metric data table")
flag.StringVar(&cfg.Consistency, "cassandra-consistency", cfg.Consistency, "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
flag.StringVar(&cfg.HostSelectionPolicy, "host-selection-policy", cfg.HostSelectionPolicy, "")
flag.StringVar(&cfg.Timeout, "cassandra-timeout", cfg.Timeout, "cassandra timeout")
flag.IntVar(&cfg.WriteConcurrency, "cassandra-concurrency", 20, "number of concurrent connections to cassandra.") // this will launch idle write goroutines which we don't need but we can clean this up later.
flag.IntVar(&cfg.Retries, "cassandra-retries", cfg.Retries, "how many times to retry a query before failing it")
flag.IntVar(&cfg.WindowFactor, "window-factor", cfg.WindowFactor, "size of compaction window relative to TTL")
flag.IntVar(&cfg.CqlProtocolVersion, "cql-protocol-version", cfg.CqlProtocolVersion, "cql protocol version to use")
flag.BoolVar(&cfg.CreateKeyspace, "create-keyspace", cfg.CreateKeyspace, "enable the creation of the keyspace and tables")
flag.BoolVar(&cfg.SSL, "cassandra-ssl", cfg.SSL, "enable SSL connection to cassandra")
flag.StringVar(&cfg.CaPath, "cassandra-ca-path", cfg.CaPath, "cassandra CA certificate path when using SSL")
flag.BoolVar(&cfg.HostVerification, "cassandra-host-verification", cfg.HostVerification, "host (hostname and server cert) verification when using SSL")
flag.BoolVar(&cfg.Auth, "cassandra-auth", cfg.Auth, "enable cassandra authentication")
flag.StringVar(&cfg.Username, "cassandra-username", cfg.Username, "username for authentication")
flag.StringVar(&cfg.Password, "cassandra-password", cfg.Password, "password for authentication")
flag.StringVar(&cfg.SchemaFile, "schema-file", cfg.SchemaFile, "File containing the needed schemas in case database needs initializing")
flag.BoolVar(&cfg.DisableInitialHostLookup, "cassandra-disable-initial-host-lookup", cfg.DisableInitialHostLookup, "instruct the driver to not attempt to get host info from the system.peers table")

cfg.ReadConcurrency = 0
cfg.ReadQueueSize = 0
cfg.WriteQueueSize = 0

flag.IntVar(&startTs, "start-timestamp", 0, "timestamp at which to start, defaults to 0")
flag.IntVar(&endTs, "end-timestamp", math.MaxInt32, "timestamp at which to stop, defaults to int max")
flag.IntVar(&numThreads, "threads", 10, "number of workers to use to process data")
flag.IntVar(&statusEvery, "status-every", 100000, "print status every x keys")

flag.BoolVar(&verbose, "verbose", false, "show every record being processed")

flag.Usage = func() {
fmt.Fprintln(os.Stderr, "mt-update-ttl [flags] ttl table-in [table-out]")
fmt.Fprintln(os.Stderr, "mt-update-ttl [flags] ttl-old ttl-new")
fmt.Fprintln(os.Stderr)
fmt.Fprintln(os.Stderr, "Adjusts the data in Cassandra to use a new TTL value. The TTL is applied counting from the timestamp of the data")
fmt.Fprintln(os.Stderr, "If table-out not specified or same as table-in, will update in place. Otherwise will not touch input table and store results in table-out")
fmt.Fprintln(os.Stderr, "In that case, it is up to you to assure table-out exists before running this tool")
fmt.Fprintln(os.Stderr, "Not supported yet: for the per-ttl tables as of 0.7, automatically putting data in the right table")
fmt.Fprintln(os.Stderr, "Automatically resolves the corresponding tables based on ttl value. If the table stays the same, will update in place. Otherwise will copy to the new table, not touching the input data")
fmt.Fprintln(os.Stderr, "Unless you disable create-keyspace, tables are created as needed")
fmt.Println("Flags:")
flag.PrintDefaults()
os.Exit(-1)
}
flag.Parse()

if flag.NArg() < 2 || flag.NArg() > 3 {
stats.NewDevnull() // make sure metrics don't pile up without getting discarded

if flag.NArg() != 2 {
flag.Usage()
os.Exit(2)
}

ttl := int(dur.MustParseNDuration("ttl", flag.Arg(0)))
tableIn, tableOut := flag.Arg(1), flag.Arg(1)
if flag.NArg() == 3 {
tableOut = flag.Arg(2)
}
ttlIn := dur.MustParseNDuration("ttl", flag.Arg(0))
ttlOut := dur.MustParseNDuration("ttl", flag.Arg(1))

session, err := NewCassandraStore()
// note: cassandraStore will not be aware via its TTLTables attribute of the other, pre-existing tables,
// only of the table we're copying to. but that's ok because we don't exercise any functionality that
// needs that
store, err := cassandra.NewCassandraStore(cassandra.CliConfig, []uint32{ttlIn, ttlOut})

if err != nil {
log.Fatalf("Failed to instantiate cassandra: %s", err)
}

update(session, ttl, tableIn, tableOut)
}

func NewCassandraStore() (*gocql.Session, error) {
cluster := gocql.NewCluster(strings.Split(*cassandraAddrs, ",")...)
if *cassandraSSL {
cluster.SslOpts = &gocql.SslOptions{
CaPath: *cassandraCaPath,
EnableHostVerification: *cassandraHostVerification,
}
}
if *cassandraAuth {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: *cassandraUsername,
Password: *cassandraPassword,
}
}
cluster.Consistency = gocql.ParseConsistency(*cassandraConsistency)
cluster.Timeout = cassandra.ConvertTimeout(*cassandraTimeout, time.Millisecond)
cluster.NumConns = *cassandraConcurrency
cluster.ProtoVersion = *cqlProtocolVersion
cluster.Keyspace = *cassandraKeyspace
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: *cassandraRetries}
cluster.DisableInitialHostLookup = *cassandraDisableInitialHostLookup

switch *cassandraHostSelectionPolicy {
case "roundrobin":
cluster.PoolConfig.HostSelectionPolicy = gocql.RoundRobinHostPolicy()
case "hostpool-simple":
cluster.PoolConfig.HostSelectionPolicy = gocql.HostPoolHostPolicy(hostpool.New(nil))
case "hostpool-epsilon-greedy":
cluster.PoolConfig.HostSelectionPolicy = gocql.HostPoolHostPolicy(
hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
)
case "tokenaware,roundrobin":
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(
gocql.RoundRobinHostPolicy(),
)
case "tokenaware,hostpool-simple":
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(
gocql.HostPoolHostPolicy(hostpool.New(nil)),
)
case "tokenaware,hostpool-epsilon-greedy":
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(
gocql.HostPoolHostPolicy(
hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
),
)
default:
return nil, fmt.Errorf("unknown HostSelectionPolicy '%q'", *cassandraHostSelectionPolicy)
}
tableIn, tableOut := store.TTLTables[ttlIn].Name, store.TTLTables[ttlOut].Name

return cluster.CreateSession()
update(store, int(ttlOut), tableIn, tableOut)
}

func getTTL(now, ts, ttl int) int {
Expand All @@ -164,37 +124,46 @@ func completenessEstimate(token int64) float64 {
return ((float64(token) / float64(maxToken)) + 1) / 2
}

func worker(id int, jobs <-chan string, wg *sync.WaitGroup, session *gocql.Session, startTime, endTime, ttl int, tableIn, tableOut string) {
func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.CassandraStore, startTime, endTime, ttlOut int, tableIn, tableOut string) {
defer wg.Done()
var token int64
var ts int
var data []byte
var query string
pre := time.Now()
queryTpl := fmt.Sprintf("SELECT token(key), ts, data FROM %s where key=? AND ts>=? AND ts<?", tableIn)

for key := range jobs {
iter := session.Query(queryTpl, key, startTime, endTime).Iter()
iter := store.Session.Query(queryTpl, key, startTime, endTime).Iter()
for iter.Scan(&token, &ts, &data) {
newTTL := getTTL(int(time.Now().Unix()), ts, ttl)
newTTL := getTTL(int(time.Now().Unix()), ts, ttlOut)
if tableIn == tableOut {
query = fmt.Sprintf("UPDATE %s USING TTL %d SET data = ? WHERE key = ? AND ts = ?", tableIn, newTTL)
} else {
query = fmt.Sprintf("INSERT INTO %s (data, key, ts) values(?,?,?) USING TTL %d", tableOut, newTTL)
}
if *verbose {
if verbose {
log.Infof("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data)
}

err := session.Query(query, data, key, ts).Exec()
err := store.Session.Query(query, data, key, ts).Exec()
if err != nil {
log.Errorf("id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err)
}

doneRowsSnap := atomic.AddUint64(&doneRows, 1)
if doneRowsSnap%10000 == 0 {
if doneRowsSnap%uint64(statusEvery) == 0 {
doneKeysSnap := atomic.LoadUint64(&doneKeys)
completeness := completenessEstimate(token)
log.Infof("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)", id, doneKeysSnap, doneRowsSnap, token, completeness*100)
if completeness == 0 {
completeness = math.SmallestNonzeroFloat64
}

doneDur := time.Since(pre)
totalDur := doneDur.Seconds() / completeness
leftDur := (time.Second*time.Duration(int64(totalDur)) - doneDur).Round(time.Second)
eta := time.Now().Add(leftDur).Round(time.Second).Format("2006-1-2 15:04:05")
log.Infof("WORKING: id=%d processed %d keys, %d rows. (last token: %d, estimates: completeness %.1f%% - remaining %s - ETA %s)", id, doneKeysSnap, doneRowsSnap, token, completeness*100, leftDur, eta)
}
}
err := iter.Close()
Expand All @@ -207,16 +176,16 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, session *gocql.Sessi
}
}

func update(session *gocql.Session, ttl int, tableIn, tableOut string) {
func update(store *cassandra.CassandraStore, ttlOut int, tableIn, tableOut string) {

keyItr := session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()
keyItr := store.Session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()

jobs := make(chan string, 100)

var wg sync.WaitGroup
wg.Add(*numThreads)
for i := 0; i < *numThreads; i++ {
go worker(i, jobs, &wg, session, *startTs, *endTs, ttl, tableIn, tableOut)
wg.Add(numThreads)
for i := 0; i < numThreads; i++ {
go worker(i, jobs, &wg, store, startTs, endTs, ttlOut, tableIn, tableOut)
}

var key string
Expand Down
21 changes: 14 additions & 7 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,11 @@ Flags:
## mt-update-ttl

```
mt-update-ttl [flags] ttl table-in [table-out]
mt-update-ttl [flags] ttl-old ttl-new
Adjusts the data in Cassandra to use a new TTL value. The TTL is applied counting from the timestamp of the data
If table-out not specified or same as table-in, will update in place. Otherwise will not touch input table and store results in table-out
In that case, it is up to you to assure table-out exists before running this tool
Not supported yet: for the per-ttl tables as of 0.7, automatically putting data in the right table
Automatically resolves the corresponding tables based on ttl value. If the table stays the same, will update in place. Otherwise will copy to the new table, not touching the input data
Unless you disable create-keyspace, tables are created as needed
Flags:
-cassandra-addrs string
cassandra host (may be given multiple times as comma-separated list) (default "localhost")
Expand All @@ -525,13 +524,11 @@ Flags:
-cassandra-ca-path string
cassandra CA certificate path when using SSL (default "/etc/metrictank/ca.pem")
-cassandra-concurrency int
max number of concurrent reads to cassandra. (default 20)
number of concurrent connections to cassandra. (default 20)
-cassandra-consistency string
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
-cassandra-disable-initial-host-lookup
instruct the driver to not attempt to get host info from the system.peers table
-cassandra-host-selection-policy string
(default "tokenaware,hostpool-epsilon-greedy")
-cassandra-host-verification
host (hostname and server cert) verification when using SSL (default true)
-cassandra-keyspace string
Expand All @@ -548,14 +545,24 @@ Flags:
username for authentication (default "cassandra")
-cql-protocol-version int
cql protocol version to use (default 4)
-create-keyspace
enable the creation of the keyspace and tables (default true)
-end-timestamp int
timestamp at which to stop, defaults to int max (default 2147483647)
-host-selection-policy string
(default "tokenaware,hostpool-epsilon-greedy")
-schema-file string
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-store-cassandra.toml")
-start-timestamp int
timestamp at which to start, defaults to 0
-status-every int
print status every x keys (default 100000)
-threads int
number of workers to use to process data (default 10)
-verbose
show every record being processed
-window-factor int
size of compaction window relative to TTL (default 20)
```


Expand Down
36 changes: 18 additions & 18 deletions store/cassandra/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ func GetTable(ttl uint32, windowFactor int, nameFormat string) Table {
* generated with: https://gist.github.com/replay/69ad7cfd523edfa552cd12851fa74c58
*
* +------------------------+---------------+---------------------+----------+
* | TTL hours | table_name | window_size (hours) | sstables |
* | TTL hours | table_name | window_size (hours) | sstables |
* +------------------------+---------------+---------------------+----------+
* | 0 <= hours < 1 | metrics_0 | 1 | 0 - 2 |
* | 1 <= hours < 2 | metrics_1 | 1 | 1 - 3 |
* | 2 <= hours < 4 | metrics_2 | 1 | 2 - 5 |
* | 4 <= hours < 8 | metrics_4 | 1 | 4 - 9 |
* | 8 <= hours < 16 | metrics_8 | 1 | 8 - 17 |
* | 16 <= hours < 32 | metrics_16 | 1 | 16 - 33 |
* | 32 <= hours < 64 | metrics_32 | 2 | 16 - 33 |
* | 64 <= hours < 128 | metrics_64 | 4 | 16 - 33 |
* | 128 <= hours < 256 | metrics_128 | 7 | 19 - 38 |
* | 256 <= hours < 512 | metrics_256 | 13 | 20 - 41 |
* | 512 <= hours < 1024 | metrics_512 | 26 | 20 - 41 |
* | 1024 <= hours < 2048 | metrics_1024 | 52 | 20 - 41 |
* | 2048 <= hours < 4096 | metrics_2048 | 103 | 20 - 41 |
* | 4096 <= hours < 8192 | metrics_4096 | 205 | 20 - 41 |
* | 8192 <= hours < 16384 | metrics_8192 | 410 | 20 - 41 |
* | 16384 <= hours < 32768 | metrics_16384 | 820 | 20 - 41 |
* | 32768 <= hours < 65536 | metrics_32768 | 1639 | 20 - 41 |
* | 0 <= hours < 1 | metric_0 | 1 | 0 - 2 |
* | 1 <= hours < 2 | metric_1 | 1 | 1 - 3 |
* | 2 <= hours < 4 | metric_2 | 1 | 2 - 5 |
* | 4 <= hours < 8 | metric_4 | 1 | 4 - 9 |
* | 8 <= hours < 16 | metric_8 | 1 | 8 - 17 |
* | 16 <= hours < 32 | metric_16 | 1 | 16 - 33 |
* | 32 <= hours < 64 | metric_32 | 2 | 16 - 33 |
* | 64 <= hours < 128 | metric_64 | 4 | 16 - 33 |
* | 128 <= hours < 256 | metric_128 | 7 | 19 - 38 |
* | 256 <= hours < 512 | metric_256 | 13 | 20 - 41 |
* | 512 <= hours < 1024 | metric_512 | 26 | 20 - 41 |
* | 1024 <= hours < 2048 | metric_1024 | 52 | 20 - 41 |
* | 2048 <= hours < 4096 | metric_2048 | 103 | 20 - 41 |
* | 4096 <= hours < 8192 | metric_4096 | 205 | 20 - 41 |
* | 8192 <= hours < 16384 | metric_8192 | 410 | 20 - 41 |
* | 16384 <= hours < 32768 | metric_16384 | 820 | 20 - 41 |
* | 32768 <= hours < 65536 | metric_32768 | 1639 | 20 - 41 |
* +------------------------+---------------+---------------------+----------+
*/

Expand Down

0 comments on commit 51042d1

Please sign in to comment.