Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add catastrophe recovery for cassandra #1579

Merged
merged 10 commits into from
Jan 17, 2020
142 changes: 142 additions & 0 deletions cassandra/cassandra_session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package cassandra

import (
"fmt"
"sync"
"time"

"github.com/gocql/gocql"
log "github.com/sirupsen/logrus"
)

// Session stores a connection to Cassandra along with associated configurations
type Session struct {
session *gocql.Session
cluster *gocql.ClusterConfig
shutdown chan struct{}
connectionCheckTimeout time.Duration
connectionCheckInterval time.Duration
addrs string
logPrefix string
sync.RWMutex
}

// NewSession creates and returns a Session. Upon failure it will return nil and an error.
func NewSession(clusterConfig *gocql.ClusterConfig,
shutdown chan struct{},
timeout time.Duration,
interval time.Duration,
addrs string,
logPrefix string) (*Session, error) {
if clusterConfig == nil {
log.Errorf("cassandra.NewSession received nil pointer for ClusterConfig")
return nil, fmt.Errorf("cassandra.NewSession received nil pointer for ClusterConfig")
}

session, err := clusterConfig.CreateSession()
if err != nil {
log.Errorf("cassandra.NewSession failed to create session: %v", err)
return nil, err
}

cs := &Session{
session: session,
cluster: clusterConfig,
shutdown: shutdown,
connectionCheckTimeout: timeout,
connectionCheckInterval: interval,
addrs: addrs,
logPrefix: logPrefix,
}

return cs, nil

}

// DeadConnectionRefresh will run a query using the current Cassandra session every connectionCheckInterval
// if it cannot query Cassandra for longer than connectionCheckTimeout it will create a new session
//
// if you are not using a WaitGroup in the caller, just pass in nil
func (s *Session) DeadConnectionRefresh(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}

ticker := time.NewTicker(s.connectionCheckInterval)
var totaltime time.Duration
var err error
var oldSession *gocql.Session

OUTER:
for {
// connection to Cassandra has been down for longer than the configured timeout
if totaltime >= s.connectionCheckTimeout {
s.Lock()
start := time.Now()
for {
select {
case <-s.shutdown:
log.Infof("%s: received shutdown, exiting DeadConnectionRefresh", s.logPrefix)
if s.session != nil && !s.session.Closed() {
s.session.Close()
}
// make sure we unlock the sessionLock before returning
s.Unlock()
return
default:
log.Errorf("%s: creating new session to cassandra using hosts: %v", s.logPrefix, s.addrs)
if s.session != nil && !s.session.Closed() && oldSession == nil {
oldSession = s.session
}
s.session, err = s.cluster.CreateSession()
if err != nil {
log.Errorf("%s: error while attempting to recreate cassandra session. will retry after %v: %v", s.logPrefix, s.connectionCheckInterval.String(), err)
time.Sleep(s.connectionCheckInterval)
totaltime += s.connectionCheckInterval
// continue inner loop to attempt to reconnect
continue
}
s.Unlock()
log.Errorf("%s: reconnecting to cassandra took %s", s.logPrefix, time.Since(start).String())
totaltime = 0
if oldSession != nil {
oldSession.Close()
oldSession = nil
}
// we connected, so go back to the normal outer loop
continue OUTER
}
}
}

select {
case <-s.shutdown:
log.Infof("%s: received shutdown, exiting DeadConnectionRefresh", s.logPrefix)
if s.session != nil && !s.session.Closed() {
s.session.Close()
}
return
case <-ticker.C:
s.RLock()
// this query should work on all cassandra deployments, but we may need to revisit this
err = s.session.Query("SELECT cql_version FROM system.local").Exec()
s.RUnlock()
if err == nil {
totaltime = 0
} else {
totaltime += s.connectionCheckInterval
log.Errorf("%s: could not execute connection check query for %v: %v", s.logPrefix, totaltime.String(), err)
}
}
}
}

// CurrentSession retrieves the current active Cassandra session
//
// If the connection to Cassandra is down, this will block until it can be restored
func (s *Session) CurrentSession() *gocql.Session {
s.RLock()
session := s.session
s.RUnlock()
return session
}
9 changes: 6 additions & 3 deletions cmd/mt-store-cat/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ func printChunkSummary(ctx context.Context, store *cassandra.CassandraStore, tab
fmt.Println("## Table", tbl.Name)
if len(metrics) == 0 {
query := fmt.Sprintf("select key, ttl(data) from %s", tbl.Name)
iter := store.Session.Query(query).Iter()
session := store.Session.CurrentSession()
iter := session.Query(query).Iter()
showKeyTTL(iter, groupTTL)
} else {
for _, metric := range metrics {
for num := startMonth; num <= endMonth; num += 1 {
row_key := fmt.Sprintf("%s_%d", metric.AMKey.String(), num)
query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl.Name)
iter := store.Session.Query(query, row_key).Iter()
session := store.Session.CurrentSession()
iter := session.Query(query, row_key).Iter()
showKeyTTL(iter, groupTTL)
}
}
Expand All @@ -54,7 +56,8 @@ func printChunkCsv(ctx context.Context, store *cassandra.CassandraStore, table c
i++
}
params := []interface{}{rowKeys, end}
iter := store.Session.Query(query, params...).WithContext(ctx).Iter()
session := store.Session.CurrentSession()
iter := session.Query(query, params...).WithContext(ctx).Iter()
var key string
var ts int
var b []byte
Expand Down
6 changes: 4 additions & 2 deletions cmd/mt-store-cat/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func match(prefix, substr, glob string, metric Metric) bool {
// getMetrics lists all metrics from the store matching the given condition.
func getMetrics(idx *cassandra.CasIdx, prefix, substr, glob string, archive schema.Archive) ([]Metric, error) {
var metrics []Metric
iter := idx.Session.Query(fmt.Sprintf("select id, name from %s", idx.Config.Table)).Iter()
session := idx.Session.CurrentSession()
iter := session.Query(fmt.Sprintf("select id, name from %s", idx.Config.Table)).Iter()
var m Metric
var idString string
for iter.Scan(&idString, &m.name) {
Expand Down Expand Up @@ -80,7 +81,8 @@ func getMetrics(idx *cassandra.CasIdx, prefix, substr, glob string, archive sche
func getMetric(idx *cassandra.CasIdx, amkey schema.AMKey) ([]Metric, error) {
var metrics []Metric
// index only stores MKey's, not AMKey's.
iter := idx.Session.Query(fmt.Sprintf("select name from %s where id=? ALLOW FILTERING", idx.Config.Table), amkey.MKey.String()).Iter()
session := idx.Session.CurrentSession()
iter := session.Query(fmt.Sprintf("select name from %s where id=? ALLOW FILTERING", idx.Config.Table), amkey.MKey.String()).Iter()

var m Metric
for iter.Scan(&m.name) {
Expand Down
8 changes: 5 additions & 3 deletions cmd/mt-update-ttl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
queryTpl := fmt.Sprintf("SELECT token(key), ts, data FROM %s where key=? AND ts>=? AND ts<?", tableIn)

for key := range jobs {
iter := store.Session.Query(queryTpl, key, startTime, endTime).Iter()
session := store.Session.CurrentSession()
iter := session.Query(queryTpl, key, startTime, endTime).Iter()
for iter.Scan(&token, &ts, &data) {
newTTL := getTTL(int(time.Now().Unix()), ts, ttlOut)
if tableIn == tableOut {
Expand All @@ -146,7 +147,7 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
log.Infof("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data)
}

err := store.Session.Query(query, data, key, ts).Exec()
err := session.Query(query, data, key, ts).Exec()
if err != nil {
log.Errorf("id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err)
}
Expand Down Expand Up @@ -178,7 +179,8 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas

func update(store *cassandra.CassandraStore, ttlOut int, tableIn, tableOut string) {

keyItr := store.Session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()
session := store.Session.CurrentSession()
keyItr := session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()

jobs := make(chan string, 100)

Expand Down
8 changes: 8 additions & 0 deletions docker/docker-chaos/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ username = cassandra
password = cassandra
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

## Bigtable backend Store Settings ##
[bigtable-store]
Expand Down Expand Up @@ -444,6 +448,10 @@ create-keyspace = false
schema-file = /etc/metrictank/schema-idx-cassandra.toml
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

### in-memory only
[memory-idx]
Expand Down
8 changes: 8 additions & 0 deletions docker/docker-cluster-query/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ username = cassandra
password = cassandra
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

## Bigtable backend Store Settings ##
[bigtable-store]
Expand Down Expand Up @@ -444,6 +448,10 @@ create-keyspace = false
schema-file = /etc/metrictank/schema-idx-cassandra.toml
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

### in-memory only
[memory-idx]
Expand Down
8 changes: 8 additions & 0 deletions docker/docker-cluster/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ username = cassandra
password = cassandra
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

## Bigtable backend Store Settings ##
[bigtable-store]
Expand Down Expand Up @@ -444,6 +448,10 @@ create-keyspace = false
schema-file = /etc/metrictank/schema-idx-cassandra.toml
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

### in-memory only
[memory-idx]
Expand Down
8 changes: 8 additions & 0 deletions docker/docker-dev-custom-cfg-kafka/metrictank.ini
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ username = cassandra
password = cassandra
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

## Bigtable backend Store Settings ##
[bigtable-store]
Expand Down Expand Up @@ -444,6 +448,10 @@ create-keyspace = true
schema-file = /etc/metrictank/schema-idx-cassandra.toml
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s

### in-memory only
[memory-idx]
Expand Down
8 changes: 8 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ username = cassandra
password = cassandra
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s
```

## Bigtable backend Store Settings ##
Expand Down Expand Up @@ -515,6 +519,10 @@ create-keyspace = true
schema-file = /etc/metrictank/schema-idx-cassandra.toml
# instruct the driver to not attempt to get host info from the system.peers table
disable-initial-host-lookup = false
# interval at which to perform a connection check to cassandra, set to 0 to disable.
connection-check-interval = 5s
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
connection-check-timeout = 30s
```

### in-memory only
Expand Down
8 changes: 8 additions & 0 deletions docs/tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ cass config flags:
enable cassandra user authentication
-ca-path string
cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem")
-connection-check-interval duration
interval at which to perform a connection check to cassandra, set to 0 to disable. (default 5s)
-connection-check-timeout duration
maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval. (default 30s)
-consistency string
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
-create-keyspace
Expand Down Expand Up @@ -251,6 +255,10 @@ cass config flags:
enable cassandra user authentication
-ca-path string
cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem")
-connection-check-interval duration
interval at which to perform a connection check to cassandra, set to 0 to disable. (default 5s)
-connection-check-timeout duration
maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval. (default 30s)
-consistency string
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
-create-keyspace
Expand Down
Loading