diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index dcffd015a320..2dffdeb0e0f4 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -25,6 +25,10 @@ import ( func init() { factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities()) + // EqualPriority is a prioritizer function that gives an equal weight of one to all minions + // Register the priority function so that its available + // but do not include it as part of the default priorities + factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 1) } func defaultPredicates() util.StringSet { @@ -59,14 +63,12 @@ func defaultPriorities() util.StringSet { // spreads pods by minimizing the number of pods (belonging to the same service) on the same minion. factory.RegisterPriorityConfigFactory( "ServiceSpreadingPriority", - func(args factory.PluginFactoryArgs) algorithm.PriorityConfig { - return algorithm.PriorityConfig{ - Function: algorithm.NewServiceSpreadPriority(args.ServiceLister), - Weight: 1, - } + factory.PriorityConfigFactory{ + Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction { + return algorithm.NewServiceSpreadPriority(args.ServiceLister) + }, + Weight: 1, }, ), - // EqualPriority is a prioritizer function that gives an equal weight of one to all minions - factory.RegisterPriorityFunction("EqualPriority", algorithm.EqualPriority, 0), ) } diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/factory.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/factory.go index 4fc10b398ad6..30b5dd60f5af 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/factory.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/factory.go @@ -103,6 +103,21 @@ func (f *ConfigFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler return f.CreateFromKeys(predicateKeys, priorityKeys) } +// ReflectorDeletionHook passes all operations through to Store, but calls +// OnDelete in a goroutine if there is a deletion. +type ReflectorDeletionHook struct { + cache.Store + OnDelete func(obj interface{}) +} + +func (r ReflectorDeletionHook) Delete(obj interface{}) error { + go func() { + defer util.HandleCrash() + r.OnDelete(obj) + }() + return r.Store.Delete(obj) +} + // Creates a scheduler from a set of registered fit predicate keys and priority keys. func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSet) (*scheduler.Config, error) { glog.V(2).Infof("creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys) @@ -125,9 +140,22 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe // Watch and queue pods that need scheduling. cache.NewReflector(f.createUnassignedPodLW(), &api.Pod{}, f.PodQueue, 0).Run() + // Pass through all events to the scheduled pod store, but on a deletion, + // also remove from the assumed pods. + assumedPodDeleter := ReflectorDeletionHook{ + Store: f.ScheduledPodLister.Store, + OnDelete: func(obj interface{}) { + if pod, ok := obj.(*api.Pod); ok { + f.modeler.LockedAction(func() { + f.modeler.ForgetPod(pod) + }) + } + }, + } + // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.ScheduledPodLister.Store, 0).Run() + cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, assumedPodDeleter, 0).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/plugins.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/plugins.go index 4b0461a75c7d..a8737f5723fe 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/plugins.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory/plugins.go @@ -40,7 +40,13 @@ type PluginFactoryArgs struct { type FitPredicateFactory func(PluginFactoryArgs) algorithm.FitPredicate // A PriorityFunctionFactory produces a PriorityConfig from the given args. -type PriorityConfigFactory func(PluginFactoryArgs) algorithm.PriorityConfig +type PriorityFunctionFactory func(PluginFactoryArgs) algorithm.PriorityFunction + +// A PriorityConfigFactory produces a PriorityConfig from the given function and weight +type PriorityConfigFactory struct { + Function PriorityFunctionFactory + Weight int +} var ( schedulerFactoryMutex sync.Mutex @@ -127,8 +133,11 @@ func IsFitPredicateRegistered(name string) bool { // Registers a priority function with the algorithm registry. Returns the name, // with which the function was registered. func RegisterPriorityFunction(name string, function algorithm.PriorityFunction, weight int) string { - return RegisterPriorityConfigFactory(name, func(PluginFactoryArgs) algorithm.PriorityConfig { - return algorithm.PriorityConfig{Function: function, Weight: weight} + return RegisterPriorityConfigFactory(name, PriorityConfigFactory{ + Function: func(PluginFactoryArgs) algorithm.PriorityFunction { + return function + }, + Weight: weight, }) } @@ -143,43 +152,47 @@ func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) strin // Registers a custom priority function with the algorithm registry. // Returns the name, with which the priority function was registered. func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string { - var pcf PriorityConfigFactory + var pcf *PriorityConfigFactory validatePriorityOrDie(policy) // generate the priority function, if a custom priority is requested if policy.Argument != nil { if policy.Argument.ServiceAntiAffinity != nil { - pcf = func(args PluginFactoryArgs) algorithm.PriorityConfig { - return algorithm.PriorityConfig{ - Function: algorithm.NewServiceAntiAffinityPriority( + pcf = &PriorityConfigFactory{ + Function: func(args PluginFactoryArgs) algorithm.PriorityFunction { + return algorithm.NewServiceAntiAffinityPriority( args.ServiceLister, policy.Argument.ServiceAntiAffinity.Label, - ), - Weight: policy.Weight, - } + ) + }, + Weight: policy.Weight, } } else if policy.Argument.LabelPreference != nil { - pcf = func(args PluginFactoryArgs) algorithm.PriorityConfig { - return algorithm.PriorityConfig{ - Function: algorithm.NewNodeLabelPriority( + pcf = &PriorityConfigFactory{ + Function: func(args PluginFactoryArgs) algorithm.PriorityFunction { + return algorithm.NewNodeLabelPriority( policy.Argument.LabelPreference.Label, policy.Argument.LabelPreference.Presence, - ), - Weight: policy.Weight, - } + ) + }, + Weight: policy.Weight, } } - } else if _, ok := priorityFunctionMap[policy.Name]; ok { + } else if existing_pcf, ok := priorityFunctionMap[policy.Name]; ok { glog.V(2).Infof("Priority type %s already registered, reusing.", policy.Name) - return policy.Name + // set/update the weight based on the policy + pcf = &PriorityConfigFactory{ + Function: existing_pcf.Function, + Weight: policy.Weight, + } } if pcf == nil { glog.Fatalf("Invalid configuration: Priority type not found for %s", policy.Name) } - return RegisterPriorityConfigFactory(policy.Name, pcf) + return RegisterPriorityConfigFactory(policy.Name, *pcf) } // This check is useful for testing providers. @@ -242,7 +255,10 @@ func getPriorityFunctionConfigs(names util.StringSet, args PluginFactoryArgs) ([ if !ok { return nil, fmt.Errorf("Invalid priority name %s specified - no corresponding function found", name) } - configs = append(configs, factory(args)) + configs = append(configs, algorithm.PriorityConfig{ + Function: factory.Function(args), + Weight: factory.Weight, + }) } return configs, nil } diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/modeler.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/modeler.go index 97e0907475ba..7833b5b94b6f 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/modeler.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/modeler.go @@ -19,6 +19,7 @@ package scheduler import ( "fmt" "strings" + "sync" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" @@ -40,9 +41,24 @@ type ExtendedPodLister interface { Exists(pod *api.Pod) (bool, error) } +// actionLocker implements lockedAction (so the fake and SimpleModeler can both +// use it) +type actionLocker struct { + sync.Mutex +} + +// LockedAction serializes calls of whatever is passed as 'do'. +func (a *actionLocker) LockedAction(do func()) { + a.Lock() + defer a.Unlock() + do() +} + // FakeModeler implements the SystemModeler interface. type FakeModeler struct { AssumePodFunc func(pod *api.Pod) + ForgetPodFunc func(pod *api.Pod) + actionLocker } // AssumePod calls the function variable if it is not nil. @@ -52,6 +68,13 @@ func (f *FakeModeler) AssumePod(pod *api.Pod) { } } +// ForgetPod calls the function variable if it is not nil. +func (f *FakeModeler) ForgetPod(pod *api.Pod) { + if f.ForgetPodFunc != nil { + f.ForgetPodFunc(pod) + } +} + // SimpleModeler implements the SystemModeler interface with a timed pod cache. type SimpleModeler struct { queuedPods ExtendedPodLister @@ -61,6 +84,8 @@ type SimpleModeler struct { // haven't yet shown up in the scheduledPods variable. // TODO: periodically clear this. assumedPods *cache.StoreToPodLister + + actionLocker } // NewSimpleModeler returns a new SimpleModeler. @@ -78,6 +103,10 @@ func (s *SimpleModeler) AssumePod(pod *api.Pod) { s.assumedPods.Add(pod) } +func (s *SimpleModeler) ForgetPod(pod *api.Pod) { + s.assumedPods.Delete(pod) +} + // Extract names for readable logging. func podNames(pods []api.Pod) []string { out := make([]string, len(pods)) diff --git a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go index 86b45f4322bc..3dcedb8e1d84 100644 --- a/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go +++ b/Godeps/_workspace/src/github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/scheduler.go @@ -44,6 +44,16 @@ type SystemModeler interface { // The assumtion should last until the system confirms the // assumtion or disconfirms it. AssumePod(pod *api.Pod) + // ForgetPod removes a pod assumtion. (It won't make the model + // show the absence of the given pod if the pod is in the scheduled + // pods list!) + ForgetPod(pod *api.Pod) + + // For serializing calls to Assume/ForgetPod: imagine you want to add + // a pod iff a bind succeeds, but also remove a pod if it is deleted. + // TODO: if SystemModeler begins modeling things other than pods, this + // should probably be parameterized or specialized for pods. + LockedAction(f func()) } // Scheduler watches for new unscheduled pods. It attempts to find @@ -104,16 +114,21 @@ func (s *Scheduler) scheduleOne() { Name: dest, }, } - if err := s.config.Binder.Bind(b); err != nil { - glog.V(1).Infof("Failed to bind pod: %v", err) - s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err) - s.config.Error(pod, err) - return - } - s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) - // tell the model to assume that this binding took effect. - assumed := *pod - assumed.Spec.Host = dest - assumed.Status.Host = dest - s.config.Modeler.AssumePod(&assumed) + + // We want to add the pod to the model iff the bind succeeds, but we don't want to race + // with any deletions, which happen asyncronously. + s.config.Modeler.LockedAction(func() { + if err := s.config.Binder.Bind(b); err != nil { + glog.V(1).Infof("Failed to bind pod: %v", err) + s.config.Recorder.Eventf(pod, "failedScheduling", "Binding rejected: %v", err) + s.config.Error(pod, err) + return + } + s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) + // tell the model to assume that this binding took effect. + assumed := *pod + assumed.Spec.Host = dest + assumed.Status.Host = dest + s.config.Modeler.AssumePod(&assumed) + }) }