Skip to content
This repository has been archived by the owner on Sep 9, 2020. It is now read-only.

Do not scale a job group when it is in deployment. #56

Merged
merged 1 commit into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.12
require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878
github.com/davecgh/go-spew v1.1.1
github.com/gofrs/uuid v3.2.0+incompatible
github.com/gorilla/mux v1.7.1
github.com/hashicorp/consul/api v1.1.0
Expand Down
20 changes: 17 additions & 3 deletions pkg/autoscale/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,23 @@ func (a *AutoScale) Run() {

for job := range allPolicies {

// The invoke call will block until there is a free thread.
if err := a.pool.Invoke(&workerPayload{jobID: job, policy: allPolicies[job]}); err != nil {
a.logger.Error().Err(err).Msg("failed to invoke autoscaling worker thread")
// Create a new policy object to track groups that are not considered to be in
// deployment.
nonDeploying := make(map[string]*policy.GroupScalingPolicy)

// Iterate the group policies, and check whether they are in deployment or not.
for group := range allPolicies[job] {
if !a.scaler.JobGroupIsDeploying(job, group) {
nonDeploying[group] = allPolicies[job][group]
}
}

// If we have groups within the job that are not deploying, we can trigger a
// scaling event.
if len(nonDeploying) > 0 {
if err := a.pool.Invoke(&workerPayload{jobID: job, policy: allPolicies[job]}); err != nil {
a.logger.Error().Err(err).Msg("failed to invoke autoscaling worker thread")
}
}
}
a.setScalingInProgressFalse()
Expand Down
12 changes: 12 additions & 0 deletions pkg/scale/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ type Scale interface {
// Trigger performs scaling of 1 or more job groups which belong to the same job.
Trigger(string, []*GroupReq, state.Source) (*ScalingResponse, int, error)

// GetDeploymentChannel is used to return the channel where updates to Nomad deployments should
// be sent.
GetDeploymentChannel() chan interface{}

// RunDeploymentUpdateHandler is used to trigger the long running process which handles
// messages sent to the deployment update channel.
RunDeploymentUpdateHandler()

// JobGroupIsDeploying checks internal references to identify if the queried job group is
// currently in deployment.
JobGroupIsDeploying(job, group string) bool

checkJobGroupExists(*api.Job, string) *api.TaskGroup

getNewGroupCount(*api.TaskGroup, *GroupReq) int
Expand Down
71 changes: 71 additions & 0 deletions pkg/scale/deployments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package scale

import (
"github.com/hashicorp/nomad/api"
)

// deploymentsKey is a composite key used for storing in-progress Nomad deployments.
type deploymentsKey struct {
job, group string
}

// GetDeploymentChannel will return the channel where deployment updates should be sent.
func (s *Scaler) GetDeploymentChannel() chan interface{} {
return s.deploymentUpdateChan
}

// JobGroupIsDeploying returns a boolean to indicate where or not the specified job and group is
// currently in deployment.
func (s *Scaler) JobGroupIsDeploying(job, group string) bool {
s.deploymentsLock.RLock()
_, ok := s.deployments[deploymentsKey{job: job, group: group}]
s.deploymentsLock.RUnlock()
return ok
}

// RunDeploymentUpdateHandler is used to handle updates and shutdowns when monitoring Nomad job
// deployments.
func (s *Scaler) RunDeploymentUpdateHandler() {
s.logger.Info().Msg("starting scaler deployment update handler")

for {
select {
case <-s.shutdownChan:
return
case msg := <-s.deploymentUpdateChan:
go s.handleDeploymentMessage(msg)
}
}
}

func (s *Scaler) handleDeploymentMessage(msg interface{}) {
deployment, ok := msg.(*api.Deployment)
if !ok {
s.logger.Error().Msg("received unexpected deployment update message type")
return
}
s.logger.Debug().
Str("status", deployment.Status).
Str("job", deployment.JobID).
Msg("received deployment update message to handle")

s.deploymentsLock.Lock()
defer s.deploymentsLock.Unlock()

switch deployment.Status {
case "running":
// If the deployment is running, then we need to ensure that this is correctly tracked in
// the scaler.
for tg := range deployment.TaskGroups {
s.deployments[deploymentsKey{job: deployment.JobID, group: tg}] = nil
}

default:
// The default is used to catch paused, cancelled, failed, and successful deployments.
// These result in the internal tracking of the deployment to be removed, indicating that
// the job group is not in deployment and can therefore be scaled.
for tg := range deployment.TaskGroups {
delete(s.deployments, deploymentsKey{job: deployment.JobID, group: tg})
}
}
}
51 changes: 51 additions & 0 deletions pkg/scale/deployments_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package scale

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestScaler_JobGroupIsDeploying(t *testing.T) {
testCases := []struct {
storedMap map[deploymentsKey]interface{}
inputJob string
inputGroup string
expectedResult bool
name string
}{
{
storedMap: generateStoredMap(),
inputJob: "test-job-1",
inputGroup: "test-group-1",
expectedResult: true,
name: "job group within stored map",
},
{
storedMap: generateStoredMap(),
inputJob: "test-job-2",
inputGroup: "test-group-2",
expectedResult: false,
name: "job group not within stored map",
},
{
storedMap: generateStoredMap(),
inputJob: "test-job-1",
inputGroup: "test-group-2",
expectedResult: false,
name: "job exists but group not within stored map",
},
}

for _, tc := range testCases {
s := Scaler{deployments: tc.storedMap}
actualResult := s.JobGroupIsDeploying(tc.inputJob, tc.inputGroup)
assert.Equal(t, tc.expectedResult, actualResult, tc.name)
}
}

func generateStoredMap() map[deploymentsKey]interface{} {
m := make(map[deploymentsKey]interface{})
m[deploymentsKey{job: "test-job-1", group: "test-group-1"}] = nil
return m
}
17 changes: 13 additions & 4 deletions pkg/scale/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scale
import (
"net/http"
"strings"
"sync"

"github.com/hashicorp/nomad/api"
"github.com/jrasell/sherpa/pkg/state"
Expand All @@ -18,14 +19,22 @@ type Scaler struct {
nomadClient *api.Client
state scale.Backend
strict bool

deployments map[deploymentsKey]interface{}
deploymentsLock sync.RWMutex
deploymentUpdateChan chan interface{}

shutdownChan chan interface{}
}

func NewScaler(c *api.Client, l zerolog.Logger, state scale.Backend, strictChecking bool) Scale {
return &Scaler{
logger: l,
nomadClient: c,
state: state,
strict: strictChecking,
logger: l,
nomadClient: c,
state: state,
strict: strictChecking,
deployments: make(map[deploymentsKey]interface{}),
deploymentUpdateChan: make(chan interface{}),
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/scale/v1/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ const (
var (
errInternalScaleOutNoPolicy = errors.New("scale out forbidden, no scaling policy found")
errInternalScaleInNoPolicy = errors.New("scale in forbidden, no scaling policy found")
errJobGroupInDeployment = errors.New("scale forbidden, job group currently deploying")
)
38 changes: 32 additions & 6 deletions pkg/scale/v1/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net/http"

"github.com/gorilla/mux"
"github.com/hashicorp/nomad/api"
policyBackend "github.com/jrasell/sherpa/pkg/policy/backend"
"github.com/jrasell/sherpa/pkg/scale"
"github.com/jrasell/sherpa/pkg/state"
Expand All @@ -22,12 +21,21 @@ type Scale struct {
scaler scale.Scale
}

func NewScaleServer(l zerolog.Logger, strict bool, backend policyBackend.PolicyBackend, state stateBackend.Backend, c *api.Client) *Scale {
// ScaleConfig is a convenience for setting up the scale server. These objects are centrally built
// and passed to the server.
type ScaleConfig struct {
Logger zerolog.Logger
Policy policyBackend.PolicyBackend
Scale scale.Scale
State stateBackend.Backend
}

func NewScaleServer(strict bool, cfg *ScaleConfig) *Scale {
return &Scale{
logger: l,
scaler: scale.NewScaler(c, l, state, strict),
policyBackend: backend,
stateBackend: state,
logger: cfg.Logger,
scaler: cfg.Scale,
policyBackend: cfg.Policy,
stateBackend: cfg.State,
strictChecking: strict,
}
}
Expand All @@ -39,6 +47,15 @@ func (s *Scale) InJobGroup(w http.ResponseWriter, r *http.Request) {

newReq := &scale.GroupReq{Direction: scale.DirectionIn, GroupName: groupID}

if s.scaler.JobGroupIsDeploying(jobID, groupID) {
s.logger.Info().
Str("job", jobID).
Str("group", groupID).
Msg("job group is currently in deployment and cannot be scaled")
http.Error(w, errJobGroupInDeployment.Error(), http.StatusForbidden)
return
}

pol, err := s.policyBackend.GetJobGroupPolicy(jobID, groupID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down Expand Up @@ -108,6 +125,15 @@ func (s *Scale) OutJobGroup(w http.ResponseWriter, r *http.Request) {

newReq := &scale.GroupReq{Direction: scale.DirectionOut, GroupName: groupID}

if s.scaler.JobGroupIsDeploying(jobID, groupID) {
s.logger.Info().
Str("job", jobID).
Str("group", groupID).
Msg("job group is currently in deployment and cannot be scaled")
http.Error(w, errJobGroupInDeployment.Error(), http.StatusForbidden)
return
}

pol, err := s.policyBackend.GetJobGroupPolicy(jobID, groupID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down
7 changes: 6 additions & 1 deletion pkg/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ func (h *HTTPServer) setupUIRoutes() []router.Route {
func (h *HTTPServer) setupScaleRoutes() []router.Route {
h.logger.Debug().Msg("setting up server scale routes")

h.routes.Scale = scaleV1.NewScaleServer(h.logger, h.cfg.Server.StrictPolicyChecking, h.policyBackend, h.stateBackend, h.nomad)
h.routes.Scale = scaleV1.NewScaleServer(h.cfg.Server.StrictPolicyChecking, &scaleV1.ScaleConfig{
Logger: h.logger,
Policy: h.policyBackend,
Scale: h.scaleBackend,
State: h.stateBackend,
})

return router.Routes{
router.Route{
Expand Down
15 changes: 15 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
stateBackend "github.com/jrasell/sherpa/pkg/state/scale"
stateConsul "github.com/jrasell/sherpa/pkg/state/scale/consul"
stateMemory "github.com/jrasell/sherpa/pkg/state/scale/memory"
"github.com/jrasell/sherpa/pkg/watcher"
"github.com/jrasell/sherpa/pkg/watcher/deployment"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
Expand All @@ -40,6 +42,9 @@ type HTTPServer struct {
scaleBackend scale.Scale
clusterBackend clusterBackend.Backend

// deploymentWatcher is used to watch deployments in order to update internal tracking.
deploymentWatcher watcher.Watcher

clusterMember *cluster.Member

nomad *api.Client
Expand Down Expand Up @@ -78,6 +83,9 @@ func (h *HTTPServer) Start() error {

go h.leaderUpdateHandler()

// Start the deployment watcher, using the scale deployment channel for updates.
go h.deploymentWatcher.Run(h.scaleBackend.GetDeploymentChannel())

h.handleSignals()
return nil
}
Expand All @@ -104,6 +112,9 @@ func (h *HTTPServer) setup() error {
h.setupStoredBackends()

h.setupScaler()
go h.scaleBackend.RunDeploymentUpdateHandler()

h.setupDeploymentWatcher()

mem, err := cluster.NewMember(h.logger, h.clusterBackend, h.addr, h.cfg.Cluster.Addr, h.cfg.Cluster.Name)
if err != nil {
Expand Down Expand Up @@ -209,6 +220,10 @@ func (h *HTTPServer) setupScaler() {
h.scaleBackend = scale.NewScaler(h.nomad, h.logger, h.stateBackend, h.cfg.Server.StrictPolicyChecking)
}

func (h *HTTPServer) setupDeploymentWatcher() {
h.deploymentWatcher = deployment.New(h.logger, h.nomad)
}

func (h *HTTPServer) setupListener() net.Listener {
var (
err error
Expand Down
Loading