Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
add new 'query' cluster mode and better name for modes
Browse files Browse the repository at this point in the history
fix #989
fix 1013
  • Loading branch information
Dieterbe committed Mar 19, 2019
1 parent 0d06eae commit 3067a21
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 45 deletions.
31 changes: 18 additions & 13 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ type ModeType string
var counter uint32

const (
ModeFull = "full"
ModeShard = "shard"
ModeQuery = "query"
// deprecated
ModeSingle = "single"
ModeMulti = "multi"
)

func validMode(m string) bool {
if ModeType(m) == ModeSingle || ModeType(m) == ModeMulti {
return true
}
return false
return (ModeType(m) == ModeFull || ModeType(m) == ModeShard || ModeType(m) == ModeQuery)
}

var (
Expand All @@ -49,10 +50,10 @@ func Init(name, version string, started time.Time, apiScheme string, apiPort int
Updated: time.Now(),
local: true,
}
if Mode == ModeMulti {
Manager = NewMemberlistManager(thisNode)
} else {
if Mode == ModeFull {
Manager = NewSingleNodeManager(thisNode)
} else { // Shard or Query mode
Manager = NewMemberlistManager(thisNode)
}
// initialize our "primary" state metric.
nodePrimary.Set(primary)
Expand Down Expand Up @@ -80,8 +81,8 @@ type partitionCandidates struct {
// nodes with the lowest prio.
func MembersForQuery() ([]Node, error) {
thisNode := Manager.ThisNode()
// If we are running in single mode, just return thisNode
if Mode == ModeSingle {
// If we are running in full mode, just return thisNode
if Mode == ModeFull {
return []Node{thisNode}, nil
}

Expand Down Expand Up @@ -172,8 +173,8 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {
allNodes := Manager.MemberList()
membersMap := make(map[int32][]Node)

// If we are running in single mode, just return thisNode
if Mode == ModeSingle {
// If we are running in full mode, just return thisNode
if Mode == ModeFull {
membersMap[0] = []Node{thisNode}
return membersMap, nil
}
Expand All @@ -185,10 +186,14 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {
if !member.IsReady() {
continue
}
memberStartPartition := member.GetPartitions()[0]
partitions := member.GetPartitions()
if len(partitions) == 0 {
continue
}
memberStartPartition := partitions[0]

if _, ok := membersMap[memberStartPartition]; !ok {
peerPartitions += len(member.GetPartitions())
peerPartitions += len(partitions)
}

membersMap[memberStartPartition] = append(membersMap[memberStartPartition], member)
Expand Down
12 changes: 6 additions & 6 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import (
"time"
)

func TestPeersForQuerySingle(t *testing.T) {
Mode = ModeSingle
func TestPeersForQueryFull(t *testing.T) {
Mode = ModeFull
Init("node1", "test", time.Now(), "http", 6060)
Manager.SetPrimary(true)
Manager.SetPartitions([]int32{1, 2})
maxPrio = 10
Manager.SetPriority(10)
Manager.SetReady()
Convey("when cluster in single mode", t, func() {
Convey("when instance is in full mode", t, func() {
selected, err := MembersForQuery()
So(err, ShouldBeNil)
So(selected, ShouldHaveLength, 1)
So(selected[0], ShouldResemble, Manager.ThisNode())
})
}

func TestPeersForQueryMulti(t *testing.T) {
Mode = ModeMulti
func TestPeersForQueryShard(t *testing.T) {
Mode = ModeShard
Init("node1", "test", time.Now(), "http", 6060)
manager := Manager.(*MemberlistManager)
manager.SetPrimary(true)
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestPeersForQueryMulti(t *testing.T) {
},
}
manager.Unlock()
Convey("when cluster in multi mode", t, func() {
Convey("when cluster in shard mode", t, func() {
selected, err := MembersForQuery()
So(err, ShouldBeNil)
So(selected, ShouldHaveLength, 2)
Expand Down
14 changes: 11 additions & 3 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func ConfigSetup() {
clusterCfg.StringVar(&ClusterName, "name", "metrictank", "Unique name of the cluster.")
clusterCfg.BoolVar(&primary, "primary-node", false, "the primary node writes data to cassandra. There should only be 1 primary node per shardGroup.")
clusterCfg.StringVar(&peersStr, "peers", "", "TCP addresses of other nodes, comma separated. use this if you shard your data and want to query other instances")
clusterCfg.StringVar(&mode, "mode", "single", "Operating mode of cluster. (single|multi)")
clusterCfg.StringVar(&mode, "mode", "full", "Operating mode of this instance within the cluster. (full|shard|query)")
clusterCfg.DurationVar(&httpTimeout, "http-timeout", time.Second*60, "How long to wait before aborting http requests to cluster peers and returning a http 503 service unavailable")
clusterCfg.IntVar(&maxPrio, "max-priority", 10, "maximum priority before a node should be considered not-ready.")
clusterCfg.IntVar(&minAvailableShards, "min-available-shards", 0, "minimum number of shards that must be available for a query to be handled.")
Expand Down Expand Up @@ -101,6 +101,14 @@ func ConfigSetup() {

func ConfigProcess() {
// check settings in cluster section
if mode == ModeSingle {
log.Warn("CLU Config: 'single' mode deprecated. converting to 'full' mode")
mode = "full"
}
if mode == ModeMulti {
log.Warn("CLU Config: 'multi' mode deprecated. converting to 'shard' mode")
mode = "shard"
}
if !validMode(mode) {
log.Fatal("CLU Config: invalid cluster operating mode")
}
Expand All @@ -125,8 +133,8 @@ func ConfigProcess() {
Timeout: httpTimeout,
}

// all further stuff is only relevant in multi mode
if mode != ModeMulti {
// all further stuff is only relevant in shard/query mode
if mode == ModeFull {
return
}

Expand Down
57 changes: 36 additions & 21 deletions cmd/metrictank/metrictank.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,13 @@ func main() {
bigtable.ConfigProcess()
bigtableStore.ConfigProcess(mdata.MaxChunkSpan())

if !inCarbon.Enabled && !inKafkaMdm.Enabled && !inPrometheus.Enabled {
log.Fatal("you should enable at least 1 input plugin")
inputEnabled := inCarbon.Enabled || inKafkaMdm.Enabled || inPrometheus.Enabled
wantInput := cluster.Mode == cluster.ModeFull || cluster.Mode == cluster.ModeShard
if !inputEnabled && wantInput {
log.Fatal("you should enable at least 1 input plugin in 'full' or 'shard' cluster mode")
}
if inputEnabled && !wantInput {
log.Fatal("you should not have an input enabled in 'query' cluster mode")
}

sec := dur.MustParseNDuration("warm-up-period", *warmUpPeriodStr)
Expand Down Expand Up @@ -263,8 +268,14 @@ func main() {
if cassandraStore.CliConfig.Enabled && bigtableStore.CliConfig.Enabled {
log.Fatal("only 1 backend store plugin can be enabled at once.")
}
if !cassandraStore.CliConfig.Enabled && !bigtableStore.CliConfig.Enabled {
log.Fatal("at least 1 backend store plugin needs to be enabled.")
if wantInput {
if !cassandraStore.CliConfig.Enabled && !bigtableStore.CliConfig.Enabled {
log.Fatal("at least 1 backend store plugin needs to be enabled in 'full' or 'shard' cluster mode")
}
} else {
if cassandraStore.CliConfig.Enabled || bigtableStore.CliConfig.Enabled {
log.Fatal("no backend store plugin may be enabled in 'query' cluster mode")
}
}
if bigtableStore.CliConfig.Enabled {
schemaMaxChunkSpan := mdata.MaxChunkSpan()
Expand Down Expand Up @@ -329,10 +340,15 @@ func main() {

idx.OrgIdPublic = uint32(*publicOrg)

idxEnabled := memory.Enabled || cassandra.CliConfig.Enabled || bigtable.CliConfig.Enabled
if !idxEnabled && wantInput {
log.Fatal("you should enable 1 index plugin in 'full' or 'shard' cluster mode")
}
if idxEnabled && !wantInput {
log.Fatal("you should not have an index plugin enabled in 'query' cluster mode")
}

if memory.Enabled {
if metricIndex != nil {
log.Fatal("Only 1 metricIndex handler can be enabled.")
}
metricIndex = memory.New()
}
if cassandra.CliConfig.Enabled {
Expand All @@ -348,10 +364,6 @@ func main() {
metricIndex = bigtable.New(bigtable.CliConfig)
}

if metricIndex == nil {
log.Fatal("No metricIndex handlers enabled.")
}

/***********************************
Initialize our API server
***********************************/
Expand All @@ -372,24 +384,27 @@ func main() {
/***********************************
Load index entries from the backend store.
***********************************/
err = metricIndex.Init()
if err != nil {
log.Fatalf("failed to initialize metricIndex: %s", err.Error())
if wantInput {
err = metricIndex.Init()
if err != nil {
log.Fatalf("failed to initialize metricIndex: %s", err.Error())
}
log.Infof("metricIndex initialized in %s. starting data consumption", time.Now().Sub(pre))
}
log.Infof("metricIndex initialized in %s. starting data consumption", time.Now().Sub(pre))

/***********************************
Initialize MetricPersist notifiers
***********************************/
var notifiers []mdata.Notifier
if notifierKafka.Enabled {
// The notifierKafka notifiers will block here until it has processed the backlog of metricPersist messages.
// it will block for at most kafka-cluster.backlog-process-timeout (default 60s)
notifiers = append(notifiers, notifierKafka.New(*instance, mdata.NewDefaultNotifierHandler(metrics, metricIndex)))
if wantInput {
if notifierKafka.Enabled {
// The notifierKafka notifiers will block here until it has processed the backlog of metricPersist messages.
// it will block for at most kafka-cluster.backlog-process-timeout (default 60s)
notifiers = append(notifiers, notifierKafka.New(*instance, mdata.NewDefaultNotifierHandler(metrics, metricIndex)))
}
mdata.InitPersistNotifier(notifiers...)
}

mdata.InitPersistNotifier(notifiers...)

/***********************************
Start our inputs
***********************************/
Expand Down
4 changes: 2 additions & 2 deletions metrictank-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ max-priority = 10
# TCP addresses of other nodes, comma separated. use this if you shard your data and want to query other instances.
# If no port is specified, it is assumed the other nodes are using the same port this node is listening on.
peers =
# Operating mode of cluster. (single|multi)
mode = single
# Operating mode of this instance within the cluster. (full|shard|query)
mode = full
# minimum number of shards that must be available for a query to be handled.
min-available-shards = 0
# How long to wait before aborting http requests to cluster peers and returning a http 503 service unavailable
Expand Down

0 comments on commit 3067a21

Please sign in to comment.