@@ -35,21 +35,13 @@ import (
3535 "k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
3636 klog "k8s.io/klog/v2"
3737 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
38- "k8s.io/legacy-cloud-providers/azure/retry"
3938)
4039
4140const (
42- vmInstancesRefreshPeriod = 5 * time .Minute
4341 clusterAutoscalerDeploymentPrefix = `cluster-autoscaler-`
4442 defaultMaxDeploymentsCount = 10
4543)
4644
47- var virtualMachinesStatusCache struct {
48- lastRefresh map [string ]time.Time
49- mutex sync.Mutex
50- virtualMachines map [string ][]compute.VirtualMachine
51- }
52-
5345// AgentPool implements NodeGroup interface for agent pools deployed by aks-engine.
5446type AgentPool struct {
5547 azureRef
@@ -132,54 +124,24 @@ func (as *AgentPool) MaxSize() int {
132124 return as .maxSize
133125}
134126
135- func (as * AgentPool ) getVirtualMachinesFromCache () ([]compute.VirtualMachine , error ) {
136- virtualMachinesStatusCache .mutex .Lock ()
137- defer virtualMachinesStatusCache .mutex .Unlock ()
138- klog .V (4 ).Infof ("getVirtualMachinesFromCache: starts for %+v" , as )
139-
140- if virtualMachinesStatusCache .virtualMachines == nil {
141- klog .V (4 ).Infof ("getVirtualMachinesFromCache: initialize vm cache" )
142- virtualMachinesStatusCache .virtualMachines = make (map [string ][]compute.VirtualMachine )
143- }
144- if virtualMachinesStatusCache .lastRefresh == nil {
145- klog .V (4 ).Infof ("getVirtualMachinesFromCache: initialize last refresh time cache" )
146- virtualMachinesStatusCache .lastRefresh = make (map [string ]time.Time )
147- }
148-
149- if virtualMachinesStatusCache .lastRefresh [as .Id ()].Add (vmInstancesRefreshPeriod ).After (time .Now ()) {
150- klog .V (4 ).Infof ("getVirtualMachinesFromCache: get vms from cache" )
151- return virtualMachinesStatusCache .virtualMachines [as .Id ()], nil
152- }
153- klog .V (4 ).Infof ("getVirtualMachinesFromCache: get vms from API" )
154- vms , rerr := as .GetVirtualMachines ()
155- klog .V (4 ).Infof ("getVirtualMachinesFromCache: got vms from API, len = %d" , len (vms ))
156-
157- if rerr != nil {
158- if isAzureRequestsThrottled (rerr ) {
159- klog .Warningf ("getAllVirtualMachines: throttling with message %v, would return the cached vms" , rerr )
160- return virtualMachinesStatusCache .virtualMachines [as .Id ()], nil
161- }
162-
163- return []compute.VirtualMachine {}, rerr .Error ()
164- }
165-
166- virtualMachinesStatusCache .virtualMachines [as .Id ()] = vms
167- virtualMachinesStatusCache .lastRefresh [as .Id ()] = time .Now ()
168-
169- return vms , nil
127+ // Id returns AgentPool id.
128+ func (as * AgentPool ) Id () string {
129+ return as .Name
170130}
171131
172- func invalidateVMCache (agentpoolName string ) {
173- virtualMachinesStatusCache .mutex .Lock ()
174- virtualMachinesStatusCache .lastRefresh [agentpoolName ] = time .Now ().Add (- 1 * vmInstancesRefreshPeriod )
175- virtualMachinesStatusCache .mutex .Unlock ()
132+ func (as * AgentPool ) getVMsFromCache () ([]compute.VirtualMachine , error ) {
133+ allVMs := as .manager .azureCache .getVirtualMachines ()
134+ if _ , exists := allVMs [as .Name ]; ! exists {
135+ return []compute.VirtualMachine {}, fmt .Errorf ("could not find VMs with poolName: %s" , as .Name )
136+ }
137+ return allVMs [as .Name ], nil
176138}
177139
178140// GetVMIndexes gets indexes of all virtual machines belonging to the agent pool.
179141func (as * AgentPool ) GetVMIndexes () ([]int , map [int ]string , error ) {
180142 klog .V (6 ).Infof ("GetVMIndexes: starts for as %v" , as )
181143
182- instances , err := as .getVirtualMachinesFromCache ()
144+ instances , err := as .getVMsFromCache ()
183145 if err != nil {
184146 return nil , nil , err
185147 }
@@ -222,8 +184,8 @@ func (as *AgentPool) getCurSize() (int64, error) {
222184 klog .V (5 ).Infof ("Returning agent pool (%q) size: %d\n " , as .Name , len (indexes ))
223185
224186 if as .curSize != int64 (len (indexes )) {
225- klog .V (6 ).Infof ("getCurSize:as.curSize(%d) != real size (%d), invalidating vm cache" , as .curSize , len (indexes ))
226- invalidateVMCache ( as .Id () )
187+ klog .V (6 ).Infof ("getCurSize:as.curSize(%d) != real size (%d), invalidating cache" , as .curSize , len (indexes ))
188+ as .manager . invalidateCache ( )
227189 }
228190
229191 as .curSize = int64 (len (indexes ))
@@ -316,8 +278,8 @@ func (as *AgentPool) IncreaseSize(delta int) error {
316278 klog .Warningf ("IncreaseSize: failed to cleanup outdated deployments with err: %v." , err )
317279 }
318280
319- klog .V (6 ).Infof ("IncreaseSize: invalidating vm cache" )
320- invalidateVMCache ( as .Id () )
281+ klog .V (6 ).Infof ("IncreaseSize: invalidating cache" )
282+ as .manager . invalidateCache ( )
321283
322284 indexes , _ , err := as .GetVMIndexes ()
323285 if err != nil {
@@ -357,43 +319,15 @@ func (as *AgentPool) IncreaseSize(delta int) error {
357319 // Update cache after scale success.
358320 as .curSize = int64 (expectedSize )
359321 as .lastRefresh = time .Now ()
360- klog .V (6 ).Info ("IncreaseSize: invalidating vm cache" )
361- invalidateVMCache ( as .Id () )
322+ klog .V (6 ).Info ("IncreaseSize: invalidating cache" )
323+ as .manager . invalidateCache ( )
362324 return nil
363325 }
364326
365327 klog .Errorf ("deploymentsClient.CreateOrUpdate for deployment %q failed: %v" , newDeploymentName , realError )
366328 return realError
367329}
368330
369- // GetVirtualMachines returns list of nodes for the given agent pool.
370- func (as * AgentPool ) GetVirtualMachines () ([]compute.VirtualMachine , * retry.Error ) {
371- ctx , cancel := getContextWithCancel ()
372- defer cancel ()
373-
374- result , rerr := as .manager .azClient .virtualMachinesClient .List (ctx , as .manager .config .ResourceGroup )
375- if rerr != nil {
376- return nil , rerr
377- }
378-
379- instances := make ([]compute.VirtualMachine , 0 )
380- for _ , instance := range result {
381- if instance .Tags == nil {
382- continue
383- }
384-
385- tags := instance .Tags
386- vmPoolName := tags ["poolName" ]
387- if vmPoolName == nil || ! strings .EqualFold (* vmPoolName , as .Id ()) {
388- continue
389- }
390-
391- instances = append (instances , instance )
392- }
393-
394- return instances , nil
395- }
396-
397331// DecreaseTargetSize decreases the target size of the node group. This function
398332// doesn't permit to delete any existing node and can be used only to reduce the
399333// request for new nodes that have not been yet fulfilled. Delta should be negative.
@@ -403,7 +337,7 @@ func (as *AgentPool) DecreaseTargetSize(delta int) error {
403337 as .mutex .Lock ()
404338 defer as .mutex .Unlock ()
405339
406- nodes , err := as .getVirtualMachinesFromCache ()
340+ nodes , err := as .getVMsFromCache ()
407341 if err != nil {
408342 return err
409343 }
@@ -427,14 +361,14 @@ func (as *AgentPool) Belongs(node *apiv1.Node) (bool, error) {
427361 Name : node .Spec .ProviderID ,
428362 }
429363
430- targetAsg , err := as .manager .GetAsgForInstance (ref )
364+ targetAsg , err := as .manager .GetNodeGroupForInstance (ref )
431365 if err != nil {
432366 return false , err
433367 }
434368 if targetAsg == nil {
435369 return false , fmt .Errorf ("%s doesn't belong to a known agent pool" , node .Name )
436370 }
437- if ! strings .EqualFold (targetAsg .Id (), as .Id () ) {
371+ if ! strings .EqualFold (targetAsg .Id (), as .Name ) {
438372 return false , nil
439373 }
440374 return true , nil
@@ -446,13 +380,13 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error {
446380 return nil
447381 }
448382
449- commonAsg , err := as .manager .GetAsgForInstance (instances [0 ])
383+ commonAsg , err := as .manager .GetNodeGroupForInstance (instances [0 ])
450384 if err != nil {
451385 return err
452386 }
453387
454388 for _ , instance := range instances {
455- asg , err := as .manager .GetAsgForInstance (instance )
389+ asg , err := as .manager .GetNodeGroupForInstance (instance )
456390 if err != nil {
457391 return err
458392 }
@@ -476,8 +410,8 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error {
476410 }
477411 }
478412
479- klog .V (6 ).Infof ("DeleteInstances: invalidating vm cache" )
480- invalidateVMCache ( as .Id () )
413+ klog .V (6 ).Infof ("DeleteInstances: invalidating cache" )
414+ as .manager . invalidateCache ( )
481415 return nil
482416}
483417
@@ -501,7 +435,7 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error {
501435 }
502436
503437 if belongs != true {
504- return fmt .Errorf ("%s belongs to a different asg than %s" , node .Name , as .Id () )
438+ return fmt .Errorf ("%s belongs to a different asg than %s" , node .Name , as .Name )
505439 }
506440
507441 ref := & azureRef {
@@ -518,14 +452,9 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error {
518452 return as .DeleteInstances (refs )
519453}
520454
521- // Id returns AgentPool id.
522- func (as * AgentPool ) Id () string {
523- return as .Name
524- }
525-
526455// Debug returns a debug string for the agent pool.
527456func (as * AgentPool ) Debug () string {
528- return fmt .Sprintf ("%s (%d:%d)" , as .Id () , as .MinSize (), as .MaxSize ())
457+ return fmt .Sprintf ("%s (%d:%d)" , as .Name , as .MinSize (), as .MaxSize ())
529458}
530459
531460// TemplateNodeInfo returns a node template for this agent pool.
@@ -535,7 +464,7 @@ func (as *AgentPool) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {
535464
536465// Nodes returns a list of all nodes that belong to this node group.
537466func (as * AgentPool ) Nodes () ([]cloudprovider.Instance , error ) {
538- instances , err := as .getVirtualMachinesFromCache ()
467+ instances , err := as .getVMsFromCache ()
539468 if err != nil {
540469 return nil , err
541470 }
0 commit comments