Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
move cassandra session into cassandra in root
Browse files Browse the repository at this point in the history
  • Loading branch information
robert-milan committed Jan 14, 2020
1 parent 8667be4 commit 17d2658
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 46 deletions.
84 changes: 42 additions & 42 deletions util/cassandra_session.go → cassandra/cassandra_session.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package util
package cassandra

import (
"sync"
Expand All @@ -8,8 +8,8 @@ import (
log "github.com/sirupsen/logrus"
)

// CassandraSession stores a connection to Cassandra along with associated configurations
type CassandraSession struct {
// Session stores a connection to Cassandra along with associated configurations
type Session struct {
session *gocql.Session
cluster *gocql.ClusterConfig
shutdown chan struct{}
Expand All @@ -20,22 +20,22 @@ type CassandraSession struct {
sync.RWMutex
}

// NewCassandraSession creates and returns a CassandraSession
func NewCassandraSession(session *gocql.Session,
// NewSession creates and returns a CassandraSession
func NewSession(session *gocql.Session,
clusterConfig *gocql.ClusterConfig,
shutdown chan struct{},
timeout time.Duration,
interval time.Duration,
addrs string,
logPrefix string) *CassandraSession {
logPrefix string) *Session {
if clusterConfig == nil {
panic("NewCassandraSession received nil pointer for ClusterConfig")
panic("NewSession received nil pointer for ClusterConfig")
}
if session == nil {
panic("NewCassandraSession received nil pointer for session")
panic("NewSession received nil pointer for session")
}

cs := &CassandraSession{
cs := &Session{
session: session,
cluster: clusterConfig,
shutdown: shutdown,
Expand All @@ -53,46 +53,46 @@ func NewCassandraSession(session *gocql.Session,
// if it cannot query Cassandra for longer than connectionCheckTimeout it will create a new session
//
// if you are not using a WaitGroup in the caller, just pass in nil
func (c *CassandraSession) DeadConnectionCheck(wg *sync.WaitGroup) {
func (s *Session) DeadConnectionCheck(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}

ticker := time.NewTicker(time.Second * c.connectionCheckInterval)
ticker := time.NewTicker(time.Second * s.connectionCheckInterval)
var totaltime time.Duration
var err error
var oldSession *gocql.Session

OUTER:
for {
// connection to cassandra has been down for longer than the configured timeout
if totaltime >= c.connectionCheckTimeout {
c.Lock()
// connection to Cassandra has been down for longer than the configured timeout
if totaltime >= s.connectionCheckTimeout {
s.Lock()
for {
select {
case <-c.shutdown:
log.Infof("%s: received shutdown, exiting deadConnectionCheck", c.logPrefix)
if c.session != nil && !c.session.Closed() {
c.session.Close()
case <-s.shutdown:
log.Infof("%s: received shutdown, exiting deadConnectionCheck", s.logPrefix)
if s.session != nil && !s.session.Closed() {
s.session.Close()
}
// make sure we unlock the sessionLock before returning
c.Unlock()
s.Unlock()
return
default:
log.Errorf("%s: creating new session to cassandra using hosts: %v", c.logPrefix, c.addrs)
if c.session != nil && !c.session.Closed() && oldSession == nil {
oldSession = c.session
log.Errorf("%s: creating new session to cassandra using hosts: %v", s.logPrefix, s.addrs)
if s.session != nil && !s.session.Closed() && oldSession == nil {
oldSession = s.session
}
c.session, err = c.cluster.CreateSession()
s.session, err = s.cluster.CreateSession()
if err != nil {
log.Errorf("%s: error while attempting to recreate cassandra session. will retry after %v: %v", c.logPrefix, c.connectionCheckInterval.String(), err)
time.Sleep(c.connectionCheckInterval)
totaltime += c.connectionCheckInterval
log.Errorf("%s: error while attempting to recreate cassandra session. will retry after %v: %v", s.logPrefix, s.connectionCheckInterval.String(), err)
time.Sleep(s.connectionCheckInterval)
totaltime += s.connectionCheckInterval
// continue inner loop to attempt to reconnect
continue
}
c.Unlock()
log.Errorf("%s: reconnecting to cassandra took %v", c.logPrefix, (totaltime - c.connectionCheckTimeout).String())
s.Unlock()
log.Errorf("%s: reconnecting to cassandra took %v", s.logPrefix, (totaltime - s.connectionCheckTimeout).String())
totaltime = 0
if oldSession != nil {
oldSession.Close()
Expand All @@ -105,33 +105,33 @@ OUTER:
}

select {
case <-c.shutdown:
log.Infof("%s: received shutdown, exiting deadConnectionCheck", c.logPrefix)
if c.session != nil && !c.session.Closed() {
c.session.Close()
case <-s.shutdown:
log.Infof("%s: received shutdown, exiting deadConnectionCheck", s.logPrefix)
if s.session != nil && !s.session.Closed() {
s.session.Close()
}
return
case <-ticker.C:
c.RLock()
s.RLock()
// this query should work on all cassandra deployments, but we may need to revisit this
err = c.session.Query("SELECT cql_version FROM system.local").Exec()
err = s.session.Query("SELECT cql_version FROM system.local").Exec()
if err == nil {
totaltime = 0
} else {
totaltime += c.connectionCheckInterval
log.Errorf("%s: could not execute connection check query for %v: %v", c.logPrefix, totaltime.String(), err)
totaltime += s.connectionCheckInterval
log.Errorf("%s: could not execute connection check query for %v: %v", s.logPrefix, totaltime.String(), err)
}
c.RUnlock()
s.RUnlock()
}
}
}

// CurrentSession retrieves the current active cassandra session
// CurrentSession retrieves the current active Cassandra session
//
// If the connection to Cassandra is down, this will block until it can be restored
func (c *CassandraSession) CurrentSession() *gocql.Session {
c.RLock()
session := c.session
c.RUnlock()
func (s *Session) CurrentSession() *gocql.Session {
s.RLock()
session := s.session
s.RUnlock()
return session
}
4 changes: 2 additions & 2 deletions idx/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type CasIdx struct {
memory.MemoryIndex
Config *IdxConfig
cluster *gocql.ClusterConfig
Session *util.CassandraSession
Session *cassandra.Session
metaRecords metaRecordStatusByOrg
writeQueue chan writeReq
shutdown chan struct{}
Expand Down Expand Up @@ -177,7 +177,7 @@ func (c *CasIdx) InitBare() error {
return fmt.Errorf("cassandra-idx: failed to create cassandra session: %s", err)
}

c.Session = util.NewCassandraSession(session, c.cluster, c.shutdown, c.Config.ConnectionCheckTimeout, c.Config.ConnectionCheckInterval, c.Config.Hosts, "cassnadra-idx")
c.Session = cassandra.NewSession(session, c.cluster, c.shutdown, c.Config.ConnectionCheckTimeout, c.Config.ConnectionCheckInterval, c.Config.Hosts, "cassnadra-idx")

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions store/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type ChunkReadRequest struct {
}

type CassandraStore struct {
Session *util.CassandraSession
Session *cassandra.Session
cluster *gocql.ClusterConfig
writeQueues []chan *mdata.ChunkWriteRequest
writeQueueMeters []*stats.Range32
Expand Down Expand Up @@ -225,7 +225,7 @@ func NewCassandraStore(config *StoreConfig, ttls []uint32) (*CassandraStore, err

sd := make(chan struct{})

cs := util.NewCassandraSession(session, cluster, sd, config.ConnectionCheckTimeout, config.ConnectionCheckInterval, config.Addrs, "cassandra_store")
cs := cassandra.NewSession(session, cluster, sd, config.ConnectionCheckTimeout, config.ConnectionCheckInterval, config.Addrs, "cassandra_store")

log.Debugf("cassandra_store: created session with config %+v", config)
c := &CassandraStore{
Expand Down

0 comments on commit 17d2658

Please sign in to comment.