Skip to content
This repository has been archived by the owner on Jul 1, 2023. It is now read-only.

Check for "active" status #233

Merged
merged 9 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
go.opencensus.io v0.0.0-20181129005706-8b019f31bc1c // indirect
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
golang.org/x/oauth2 v0.0.0-20181128211412-28207608b838
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f // indirect
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
google.golang.org/api v0.0.0-20181129220737-af4fc4062c26 // indirect
google.golang.org/appengine v1.3.0 // indirect
google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect
Expand Down
4 changes: 2 additions & 2 deletions infra/gravity/cluster_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *TestContext) uploadInstaller(master Gravity, nodes []Gravity, installer
return trace.Wrap(err)
}

err = c.Status(nodes)
err = c.WaitForActiveStatus(nodes)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -232,7 +232,7 @@ func (c *TestContext) upgrade(master Gravity, numNodes int) error {

// ExecScript will run and execute a script on all nodes
func (c *TestContext) ExecScript(nodes []Gravity, scriptUrl string, args []string) error {
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.Status)
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.ExecScript)
wadells marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

errs := make(chan error, len(nodes))
Expand Down
4 changes: 2 additions & 2 deletions infra/gravity/cluster_resize.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (c *TestContext) Expand(currentCluster, nodesToJoin []Gravity, p InstallPar
// status is solely used for gathering the join token, can this be replaced
// with InstallParam.Token -- 2020-05 walt
peer := currentCluster[0]
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.Status)
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.NodeStatus)
defer cancel()
status, err := peer.Status(ctx)
if err != nil {
Expand All @@ -41,7 +41,7 @@ func (c *TestContext) Expand(currentCluster, nodesToJoin []Gravity, p InstallPar
// JoinNode has one node join a peer already in a cluster
func (c *TestContext) JoinNode(peer, nodeToJoin Gravity, p InstallParam) error {

ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.Status)
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.NodeStatus)
defer cancel()
status, err := peer.Status(ctx)
if err != nil {
Expand Down
115 changes: 88 additions & 27 deletions infra/gravity/cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,105 @@ package gravity

import (
"context"
"time"

"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"

"github.com/gravitational/robotest/lib/constants"
sshutils "github.com/gravitational/robotest/lib/ssh"
"github.com/gravitational/robotest/lib/utils"
"github.com/gravitational/robotest/lib/wait"

"github.com/gravitational/trace"
)

// Status walks around all nodes and checks whether they all feel OK
func (c *TestContext) Status(nodes []Gravity) error {
c.Logger().WithField("nodes", Nodes(nodes)).Info("Check status on nodes.")
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.Status)
defer cancel()
// statusValidator returns nil if the Gravity Status is the expected status or an error otherwise.
type statusValidator func(s GravityStatus) error

retry := wait.Retryer{
Attempts: 100,
Delay: time.Second * 20,
// checkNotDegraded returns an error if the cluster status is Degraded.
//
// This function is a reimplementation of the logic in https://github.com/gravitational/gravity/blob/7.0.0/lib/status/status.go#L180-L185
func checkNotDegraded(s GravityStatus) error {
if s.Cluster.State == constants.ClusterStateDegraded {
return trace.CompareFailed("cluster state %q", s.Cluster.State)
}
if s.Cluster.SystemStatus != constants.SystemStatus_Running {
return trace.CompareFailed("expected system_status %v, found %v", constants.SystemStatus_Running, s.Cluster.SystemStatus)
}
return nil
}

err := retry.Do(ctx, func() error {
errs := make(chan error, len(nodes))
// checkActive returns an error if the cluster is degraded or state != active.
func checkActive(s GravityStatus) error {
if err := checkNotDegraded(s); err != nil {
return trace.Wrap(err)
}
if s.Cluster.State != constants.ClusterStateActive {
return trace.CompareFailed("expected state %q, found %q", constants.ClusterStateActive, s.Cluster.State)
}
return nil
}

for _, node := range nodes {
go func(n Gravity) {
_, err := n.Status(ctx)
errs <- err
}(node)
}
// WaitForActiveStatus blocks until all nodes report state = Active and notDegraded or an internal timeout expires.
func (c *TestContext) WaitForActiveStatus(nodes []Gravity) error {
wadells marked this conversation as resolved.
Show resolved Hide resolved
c.Logger().WithField("nodes", Nodes(nodes)).Info("Waiting for active status.")
return c.WaitForStatus(nodes, checkActive)
}

err := utils.CollectErrors(ctx, errs)
if err == nil {
return nil
// WaitForStatus blocks until all nodes satisfy the expected statusValidator or an internal timeout expires.
func (c *TestContext) WaitForStatus(nodes []Gravity, expected statusValidator) error {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = c.timeouts.ClusterStatus

expectStatus := func() (err error) {
statuses, err := c.Status(nodes)
if err != nil {
return trace.Wrap(err)
}
for _, status := range statuses {
err = expected(status)
if err != nil {
c.Logger().WithError(err).WithField("status", status).Warn("Unexpected Status.")
return trace.Wrap(err)
}
}
c.Logger().Warnf("Status not available on some nodes, will retry: %v.", err)
return wait.Continue("status not ready on some nodes")
})
return nil
}

err := wait.RetryWithInterval(c.ctx, b, expectStatus, c.Logger())

return trace.Wrap(err)

}

// Status queries `gravity status` once from each node in nodes.
func (c *TestContext) Status(nodes []Gravity) (statuses []GravityStatus, err error) {
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.NodeStatus)
wadells marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

valueC := make(chan GravityStatus, len(nodes))
g, ctx := errgroup.WithContext(ctx)
for _, node := range nodes {
node := node
g.Go(func() error {
status, err := node.Status(ctx)
if err != nil {
return trace.Wrap(err)
}
if status != nil {
valueC <- *status
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, trace.Wrap(err)
}
close(valueC)
for status := range valueC {
statuses = append(statuses, status)
}
return statuses, nil
}

// CheckTime walks around all nodes and checks whether their time is within acceptable limits
Expand All @@ -53,9 +113,8 @@ func (c *TestContext) CheckTimeSync(nodes []Gravity) error {
})
}

ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.Status)
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.TimeSync)
defer cancel()

err := sshutils.CheckTimeSync(ctx, timeNodes)
return trace.Wrap(err)
}
Expand Down Expand Up @@ -122,7 +181,7 @@ type ClusterNodesByRole struct {

// NodesByRole will conveniently organize nodes according to their roles in cluster
func (c *TestContext) NodesByRole(nodes []Gravity) (roles *ClusterNodesByRole, err error) {
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.Status)
ctx, cancel := context.WithTimeout(c.ctx, c.timeouts.ResolveInPlanet)
defer cancel()

roles = &ClusterNodesByRole{}
Expand All @@ -131,6 +190,8 @@ func (c *TestContext) NodesByRole(nodes []Gravity) (roles *ClusterNodesByRole, e
return nil, trace.Wrap(err)
}

ctx, cancel = context.WithTimeout(c.ctx, c.timeouts.GetPods)
defer cancel()
// Run query on the apiserver
pods, err := KubectlGetPods(ctx, roles.ApiMaster, kubeSystemNS, appGravityLabel)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion infra/gravity/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ var DefaultTimeouts = OpTimeouts{
Upgrade: time.Minute * 30, // upgrade threshold per node
Uninstall: time.Minute * 5, // uninstall threshold per node
UninstallApp: time.Minute * 5, // application uninstall threshold
Status: time.Minute * 30, // sufficient for failover procedures
NodeStatus: time.Minute * 1, // limit for status to return on a single node
ClusterStatus: time.Minute * 5, // limit for status to queisce across the cluster
Leave: time.Minute * 15, // threshold to leave cluster
CollectLogs: time.Minute * 7, // to collect logs from node
WaitForInstaller: time.Minute * 30, // wait for build to complete in parallel
AutoScaling: time.Minute * 10, // wait for autoscaling operation
TimeSync: time.Minute * 5, // wait for ntp to converge
ResolveInPlanet: time.Minute * 1, // resolve a hostname inside planet with dig
GetPods: time.Minute * 1, // use kubectl to query pods on the API master
ExecScript: time.Minute * 5, // user provided script, this should be specified by the user
}
72 changes: 65 additions & 7 deletions infra/gravity/gravity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@ package gravity
import (
"bufio"
"bytes"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

var testStatusStr = []byte(`
func TestGravityOutput(t *testing.T) {
var testStatusStr = []byte(`
{"cluster":{"application":{"repository":"gravitational.io","name":"telekube","version":"0.0.1"},"state":"active","domain":"testcluster","token":{"token":"fac3b88014367fe4e98a8664755e2be4","expires":"0001-01-01T00:00:00Z","type":"expand","account_id":"00000000-0000-0000-0000-000000000001","site_domain":"testcluster","operation_id":"","user_email":"agent@testcluster"},"operation":{"type":"operation_install","id":"55298dfd-2094-47a3-a787-8b2a546c0fd1","state":"completed","created":"2008-01-01T12:00:00.0Z","progress":{"message":"Operation has completed","completion":100,"created":"2008-01-01T12:05:00.0Z"}},"system_status":1,"nodes":[{"hostname":"node-0","advertise_ip":"10.40.2.4","role":"master","profile":"node","status":"healthy"},{"hostname":"node-2","advertise_ip":"10.40.2.5","role":"master","profile":"node","status":"healthy"},{"hostname":"node-1","advertise_ip":"10.40.2.7","role":"master","profile":"node","status":"healthy"},{"hostname":"node-5","advertise_ip":"10.40.2.6","role":"node","profile":"node","status":"healthy"},{"hostname":"node-3","advertise_ip":"10.40.2.3","role":"node","profile":"node","status":"healthy"},{"hostname":"node-4","advertise_ip":"10.40.2.2","role":"node","profile":"node","status":"healthy"}]}}
`)

func TestGravityOutput(t *testing.T) {
expectedStatus := &GravityStatus{
Cluster: ClusterStatus{
Cluster: "testcluster",
Application: Application{Name: "telekube"},
Status: "active",
Token: Token{Token: "fac3b88014367fe4e98a8664755e2be4"},
Cluster: "testcluster",
Application: Application{Name: "telekube"},
State: "active",
SystemStatus: 1,
Token: Token{Token: "fac3b88014367fe4e98a8664755e2be4"},
Nodes: []NodeStatus{
NodeStatus{Addr: "10.40.2.4"},
NodeStatus{Addr: "10.40.2.5"},
Expand All @@ -35,3 +36,60 @@ func TestGravityOutput(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expectedStatus, &status, "parseStatus")
}

func TestHealthyStatusValidation(t *testing.T) {
healthyStatus := GravityStatus{
Cluster: ClusterStatus{
Cluster: "robotest",
Application: Application{Name: "telekube"},
State: "active",
SystemStatus: 1,
Token: Token{Token: "ROBOTEST"},
Nodes: []NodeStatus{
NodeStatus{Addr: "10.1.2.3"},
NodeStatus{Addr: "10.1.2.4"},
NodeStatus{Addr: "10.1.2.5"},
},
},
}
err := checkActive(healthyStatus)
assert.NoError(t, err)
}

// Test1523StatusValidation ensures expanding status is
// identified as "unsafe to proceed" by Robotest.
//
// Expands may be unexpectedly seen after install as discussed
// in https://github.com/gravitational/gravity/issues/1523.
func Test1523StatusValidation(t *testing.T) {
nonActiveStatus := GravityStatus{
Cluster: ClusterStatus{
Cluster: "robotest",
Application: Application{Name: "telekube"},
State: "expanding",
Token: Token{Token: "ROBOTEST"},
Nodes: []NodeStatus{
NodeStatus{Addr: "10.1.2.3"},
},
},
}
err := checkActive(nonActiveStatus)
assert.Error(t, err)
}

// Test1641StatusValidation ensures a particular status type seen
// in the field identified as degraded by Robotest.
//
// See https://github.com/gravitational/gravity/issues/1641 for more info.
func Test1641StatusValidation(t *testing.T) {
f, err := os.Open("testdata/status-degraded-1641.json")
assert.NoError(t, err)
defer f.Close()

wadells marked this conversation as resolved.
Show resolved Hide resolved
var status GravityStatus
err = parseStatus(&status)(bufio.NewReader(f))
assert.NoError(t, err)

err = checkNotDegraded(status)
assert.Error(t, err)
}
32 changes: 5 additions & 27 deletions infra/gravity/node_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ type JoinCmd struct {
StateDir string
}

// IsDegraded determines whether the cluster is in degraded state
func (r GravityStatus) IsDegraded() bool {
return r.Cluster.Status == "degraded"
}

// GravityStatus describes the status of the Gravity cluster
type GravityStatus struct {
// Cluster describes the cluster status
Expand All @@ -142,8 +137,10 @@ type ClusterStatus struct {
Application Application `json:"application"`
// Cluster is the name of the cluster
Cluster string `json:"domain"`
// Status is the cluster status
Status string `json:"state"`
// State is the cluster state
State string `json:"state"`
// SystemStatus is the cluster status, see https://github.com/gravitational/satellite/blob/7.1.0/agent/proto/agentpb/agent.proto#L50-L54
wadells marked this conversation as resolved.
Show resolved Hide resolved
SystemStatus int `json:"system_status"`
// Token is secure token which prevents rogue nodes from joining the cluster during installation
Token Token `json:"token"`
// Nodes describes the nodes in the cluster
Expand Down Expand Up @@ -284,26 +281,7 @@ var installCmdTemplate = template.Must(
`))

// Status queries cluster status
func (g *gravity) Status(ctx context.Context) (status *GravityStatus, err error) {
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = defaults.ClusterStatusTimeout
err = wait.RetryWithInterval(ctx, b, func() (err error) {
status, err = g.status(ctx)
if err != nil {
return trace.Wrap(err)
}
if status.IsDegraded() {
return trace.BadParameter("degraded")
}
return nil
}, g.log)
if err != nil {
return nil, trace.Wrap(err)
}
return status, nil
}

func (g *gravity) status(ctx context.Context) (*GravityStatus, error) {
func (g *gravity) Status(ctx context.Context) (*GravityStatus, error) {
cmd := fmt.Sprintf("sudo gravity status --output=json --system-log-file=%v",
defaults.AgentLogPath)
status := GravityStatus{}
Expand Down
9 changes: 7 additions & 2 deletions infra/gravity/testcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@ const (

// OpTimeouts defines per-node, per-operation timeouts which would be used to determine
// whether test must be failed
// provisioner has its own timeout / restart logic which is dependant on cloud provider and terraform
// provisioner has its own timeout / restart logic which is dependent on cloud provider and terraform
type OpTimeouts struct {
Install time.Duration
Upgrade time.Duration
Status time.Duration
NodeStatus time.Duration
ClusterStatus time.Duration
Uninstall time.Duration
UninstallApp time.Duration
Leave time.Duration
CollectLogs time.Duration
WaitForInstaller time.Duration
AutoScaling time.Duration
TimeSync time.Duration
ResolveInPlanet time.Duration
GetPods time.Duration
ExecScript time.Duration
}

// TestContext aggregates common parameters for better test suite readability
Expand Down
Loading