Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add new 'query' cluster mode and better name for modes #1243

Merged
merged 10 commits into from
Mar 27, 2019
42 changes: 18 additions & 24 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,10 @@ import (
opentracing "github.com/opentracing/opentracing-go"
)

type ModeType string

var counter uint32

const (
ModeSingle = "single"
ModeMulti = "multi"
)

func validMode(m string) bool {
if ModeType(m) == ModeSingle || ModeType(m) == ModeMulti {
return true
}
return false
}

var (
Mode ModeType
Mode NodeMode
Manager ClusterManager
Tracer opentracing.Tracer

Expand All @@ -44,15 +30,19 @@ func Init(name, version string, started time.Time, apiScheme string, apiPort int
Version: version,
Primary: primary,
Priority: 10000,
Mode: Mode,
PrimaryChange: time.Now(),
StateChange: time.Now(),
Updated: time.Now(),
local: true,
}
if Mode == ModeMulti {
Manager = NewMemberlistManager(thisNode)
} else {
if Mode == ModeQuery {
thisNode.Priority = 0
}
if Mode == ModeDev {
Manager = NewSingleNodeManager(thisNode)
} else { // Shard or Query mode
Manager = NewMemberlistManager(thisNode)
}
// initialize our "primary" state metric.
nodePrimary.Set(primary)
Expand Down Expand Up @@ -80,8 +70,8 @@ type partitionCandidates struct {
// nodes with the lowest prio.
func MembersForQuery() ([]Node, error) {
thisNode := Manager.ThisNode()
// If we are running in single mode, just return thisNode
if Mode == ModeSingle {
// If we are running in dev mode, just return thisNode
if Mode == ModeDev {
return []Node{thisNode}, nil
}

Expand Down Expand Up @@ -172,8 +162,8 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {
allNodes := Manager.MemberList()
membersMap := make(map[int32][]Node)

// If we are running in single mode, just return thisNode
if Mode == ModeSingle {
// If we are running in dev mode, just return thisNode
if Mode == ModeDev {
membersMap[0] = []Node{thisNode}
return membersMap, nil
}
Expand All @@ -185,10 +175,14 @@ func MembersForSpeculativeQuery() (map[int32][]Node, error) {
if !member.IsReady() {
continue
}
memberStartPartition := member.GetPartitions()[0]
partitions := member.GetPartitions()
if len(partitions) == 0 {
continue
}
memberStartPartition := partitions[0]

if _, ok := membersMap[memberStartPartition]; !ok {
peerPartitions += len(member.GetPartitions())
peerPartitions += len(partitions)
}

membersMap[memberStartPartition] = append(membersMap[memberStartPartition], member)
Expand Down
15 changes: 9 additions & 6 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ import (
"time"
)

func TestPeersForQuerySingle(t *testing.T) {
Mode = ModeSingle
func TestPeersForQueryDev(t *testing.T) {
Mode = ModeDev
Init("node1", "test", time.Now(), "http", 6060)
Manager.SetPrimary(true)
Manager.SetPartitions([]int32{1, 2})
maxPrio = 10
Manager.SetPriority(10)
Manager.SetReady()
Convey("when cluster in single mode", t, func() {
Convey("when instance is in dev mode", t, func() {
selected, err := MembersForQuery()
So(err, ShouldBeNil)
So(selected, ShouldHaveLength, 1)
So(selected[0], ShouldResemble, Manager.ThisNode())
})
}

func TestPeersForQueryMulti(t *testing.T) {
Mode = ModeMulti
func TestPeersForQueryShard(t *testing.T) {
Mode = ModeShard
Init("node1", "test", time.Now(), "http", 6060)
manager := Manager.(*MemberlistManager)
manager.SetPrimary(true)
Expand All @@ -39,26 +39,29 @@ func TestPeersForQueryMulti(t *testing.T) {
Name: "node2",
Primary: true,
Partitions: []int32{1, 2},
Mode: ModeShard,
State: NodeReady,
Priority: 10,
},
"node3": {
Name: "node3",
Primary: true,
Partitions: []int32{3, 4},
Mode: ModeShard,
State: NodeReady,
Priority: 10,
},
"node4": {
Name: "node4",
Primary: true,
Partitions: []int32{3, 4},
Mode: ModeShard,
State: NodeReady,
Priority: 10,
},
}
manager.Unlock()
Convey("when cluster in multi mode", t, func() {
Convey("when cluster in shard mode", t, func() {
selected, err := MembersForQuery()
So(err, ShouldBeNil)
So(selected, ShouldHaveLength, 2)
Expand Down
23 changes: 16 additions & 7 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ var (
minAvailableShards int
gcPercent int
gcPercentNotReady int
GossipSettlePeriod time.Duration // if gossip not enabled, will be 0 regardless of config

gossipSettlePeriodStr string

swimUseConfig = "default-lan"
swimAdvertiseAddrStr string
Expand Down Expand Up @@ -70,11 +73,12 @@ func ConfigSetup() {
clusterCfg.StringVar(&ClusterName, "name", "metrictank", "Unique name of the cluster.")
clusterCfg.BoolVar(&primary, "primary-node", false, "the primary node writes data to cassandra. There should only be 1 primary node per shardGroup.")
clusterCfg.StringVar(&peersStr, "peers", "", "TCP addresses of other nodes, comma separated. use this if you shard your data and want to query other instances")
clusterCfg.StringVar(&mode, "mode", "single", "Operating mode of cluster. (single|multi)")
clusterCfg.StringVar(&mode, "mode", "dev", "Operating mode of this instance within the cluster. (dev|shard|query)")
clusterCfg.DurationVar(&httpTimeout, "http-timeout", time.Second*60, "How long to wait before aborting http requests to cluster peers and returning a http 503 service unavailable")
clusterCfg.IntVar(&maxPrio, "max-priority", 10, "maximum priority before a node should be considered not-ready.")
clusterCfg.IntVar(&minAvailableShards, "min-available-shards", 0, "minimum number of shards that must be available for a query to be handled.")
clusterCfg.IntVar(&gcPercentNotReady, "gc-percent-not-ready", gcPercent, "GOGC value to use when node is not ready. Defaults to GOGC")
clusterCfg.StringVar(&gossipSettlePeriodStr, "gossip-settle-period", "10s", "duration until when the cluster topology can be considered up-to-date and this node to be ready to serve requests (when gossip enabled).")
globalconf.Register("cluster", clusterCfg, flag.ExitOnError)

swimCfg := flag.NewFlagSet("swim", flag.ExitOnError)
Expand All @@ -101,12 +105,12 @@ func ConfigSetup() {

func ConfigProcess() {
// check settings in cluster section
if !validMode(mode) {
log.Fatal("CLU Config: invalid cluster operating mode")
var err error
Mode, err = NodeModeFromString(mode)
if err != nil {
log.Fatalf("CLU Config: %s", err.Error())
}

Mode = ModeType(mode)

if httpTimeout == 0 {
log.Fatal("CLU Config: http-timeout must be a non-zero duration string like 60s")
}
Expand All @@ -125,11 +129,16 @@ func ConfigProcess() {
Timeout: httpTimeout,
}

// all further stuff is only relevant in multi mode
if mode != ModeMulti {
// all further stuff is only relevant in shard/query mode
if Mode == ModeDev {
return
}

GossipSettlePeriod, err = time.ParseDuration(gossipSettlePeriodStr)
if err != nil {
log.Fatalf("CLU Config: invalid gossip-settle-period: %s", err.Error())
}

// check settings in swim section
if swimUseConfig != "manual" && swimUseConfig != "default-lan" && swimUseConfig != "default-local" && swimUseConfig != "default-wan" {
log.Fatal("CLU Config: invalid swim-use-config setting")
Expand Down
16 changes: 15 additions & 1 deletion cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var (
totalSecondaryReady = stats.NewGauge32("cluster.total.state.secondary-ready")
// metric cluster.total.state.secondary-not-ready is the number of nodes we know to be secondary and not ready
totalSecondaryNotReady = stats.NewGauge32("cluster.total.state.secondary-not-ready")
// metric cluster.total.state.query-ready is the number of nodes we know to be query nodes and ready
totalQueryReady = stats.NewGauge32("cluster.total.state.query-ready")
// metric cluster.total.state.query-not-ready is the number of nodes we know to be query nodes and not ready
totalQueryNotReady = stats.NewGauge32("cluster.total.state.query-not-ready")
// metric cluster.total.partitions is the number of partitions in the cluster that we know of
totalPartitions = stats.NewGauge32("cluster.total.partitions")

Expand Down Expand Up @@ -184,6 +188,8 @@ func (c *MemberlistManager) clusterStats() {
primNotReady := 0
secReady := 0
secNotReady := 0
queryReady := 0
queryNotReady := 0
partitions := make(map[int32]int)
for _, p := range c.members {
if p.Primary {
Expand All @@ -192,12 +198,18 @@ func (c *MemberlistManager) clusterStats() {
} else {
primNotReady++
}
} else {
} else if p.Mode != ModeQuery {
if p.IsReady() {
secReady++
} else {
secNotReady++
}
} else {
if p.IsReady() {
queryReady++
} else {
queryNotReady++
}
}
for _, partition := range p.Partitions {
partitions[partition]++
Expand All @@ -208,6 +220,8 @@ func (c *MemberlistManager) clusterStats() {
totalPrimaryNotReady.Set(primNotReady)
totalSecondaryReady.Set(secReady)
totalSecondaryNotReady.Set(secNotReady)
totalQueryReady.Set(queryReady)
totalQueryNotReady.Set(queryNotReady)

totalPartitions.Set(len(partitions))
}
Expand Down
54 changes: 54 additions & 0 deletions cluster/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,59 @@ import (
log "github.com/sirupsen/logrus"
)

type InvalidNodeModeErr string

func (e InvalidNodeModeErr) Error() string {
return fmt.Sprintf("invalid cluster operating mode %q", string(e))
}

//go:generate stringer -type=NodeMode -trimprefix=Mode
type NodeMode uint8

const (
ModeShard NodeMode = iota
ModeDev
ModeQuery
)

// capitalized form is what stringer (.String()) generates and is used for json serialization
func NodeModeFromString(mode string) (NodeMode, error) {
switch mode {
case "single":
log.Warn("CLU Config: 'single' mode deprecated. converting to 'dev' mode")
return ModeDev, nil
case "multi":
log.Warn("CLU Config: 'multi' mode deprecated. converting to 'shard' mode")
return ModeShard, nil
case "dev", "Dev":
return ModeDev, nil
case "shard", "Shard":
return ModeShard, nil
case "query", "Query":
return ModeQuery, nil
}
return 0, InvalidNodeModeErr(mode)
}

// MarshalJSON marshals a NodeMode
func (n NodeMode) MarshalJSON() ([]byte, error) {
buffer := bytes.NewBufferString(`"`)
buffer.WriteString(n.String())
buffer.WriteString(`"`)
return buffer.Bytes(), nil
}

// UnmarshalJSON unmashals a NodeMode
func (n *NodeMode) UnmarshalJSON(b []byte) error {
var j string
err := json.Unmarshal(b, &j)
if err != nil {
return err
}
*n, err = NodeModeFromString(j)
return err
}

//go:generate stringer -type=NodeState
type NodeState int

Expand Down Expand Up @@ -98,6 +151,7 @@ type HTTPNode struct {
Version string `json:"version"`
Primary bool `json:"primary"`
PrimaryChange time.Time `json:"primaryChange"`
Mode NodeMode `json:"mode"`
State NodeState `json:"state"`
Priority int `json:"priority"`
Started time.Time `json:"started"`
Expand Down
25 changes: 25 additions & 0 deletions cluster/nodemode_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading