From 7a650beb16fdbbd70b58219ab70dda3c7f2df699 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 15 Jan 2018 17:16:31 +0800 Subject: [PATCH 1/5] Add azure node group caches --- .../cloudprovider/azure/azure_cache.go | 182 ++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_cache.go diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cache.go b/cluster-autoscaler/cloudprovider/azure/azure_cache.go new file mode 100644 index 000000000000..2872fa810909 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_cache.go @@ -0,0 +1,182 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +// Asg is a wrapper over NodeGroup interface. +type Asg interface { + cloudprovider.NodeGroup + + getAzureRef() azureRef + getInstanceIDs() (map[azureRef]string, error) +} + +type asgCache struct { + registeredAsgs []Asg + instanceToAsg map[azureRef]Asg + instanceToID map[azureRef]string + notInRegisteredAsg map[azureRef]bool + mutex sync.Mutex + interrupt chan struct{} +} + +func newAsgCache() (*asgCache, error) { + cache := &asgCache{ + registeredAsgs: make([]Asg, 0), + instanceToAsg: make(map[azureRef]Asg), + instanceToID: make(map[azureRef]string), + notInRegisteredAsg: make(map[azureRef]bool), + interrupt: make(chan struct{}), + } + + go wait.Until(func() { + cache.mutex.Lock() + defer cache.mutex.Unlock() + if err := cache.regenerate(); err != nil { + glog.Errorf("Error while regenerating Asg cache: %v", err) + } + }, time.Hour, cache.interrupt) + + return cache, nil +} + +// Register registers a node group if it hasn't been registered. +func (m *asgCache) Register(asg Asg) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + for i := range m.registeredAsgs { + if existing := m.registeredAsgs[i]; existing.Id() == asg.Id() { + if reflect.DeepEqual(existing, asg) { + return false + } + + m.registeredAsgs[i] = asg + glog.V(4).Infof("ASG %q updated", asg.Id()) + m.invalidateUnownedInstanceCache() + return true + } + } + + glog.V(4).Infof("Registering ASG %q", asg.Id()) + m.registeredAsgs = append(m.registeredAsgs, asg) + m.invalidateUnownedInstanceCache() + return true +} + +func (m *asgCache) invalidateUnownedInstanceCache() { + glog.V(4).Info("Invalidating unowned instance cache") + m.notInRegisteredAsg = make(map[azureRef]bool) +} + +// Unregister ASG. Returns true if the ASG was unregistered. +func (m *asgCache) Unregister(asg Asg) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + + updated := make([]Asg, 0, len(m.registeredAsgs)) + changed := false + for _, existing := range m.registeredAsgs { + if existing.Id() == asg.Id() { + glog.V(1).Infof("Unregistered ASG %s", asg.Id()) + changed = true + continue + } + updated = append(updated, existing) + } + m.registeredAsgs = updated + return changed +} + +func (m *asgCache) get() []Asg { + m.mutex.Lock() + defer m.mutex.Unlock() + + return m.registeredAsgs +} + +func (m *asgCache) getInstanceIDs(instances []*azureRef) []string { + m.mutex.Lock() + defer m.mutex.Unlock() + + instanceIds := make([]string, len(instances)) + for i, instance := range instances { + instanceIds[i] = m.instanceToID[*instance] + } + + return instanceIds +} + +// FindForInstance returns Asg of the given Instance +func (m *asgCache) FindForInstance(instance *azureRef) (Asg, error) { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.notInRegisteredAsg[*instance] { + // We already know we don't own this instance. Return early and avoid + // additional calls. + return nil, nil + } + + if asg, found := m.instanceToAsg[*instance]; found { + return asg, nil + } + + if err := m.regenerate(); err != nil { + return nil, fmt.Errorf("Error while looking for ASG for instance %+v, error: %v", *instance, err) + } + if config, found := m.instanceToAsg[*instance]; found { + return config, nil + } + + m.notInRegisteredAsg[*instance] = true + return nil, nil +} + +// Cleanup closes the channel to signal the go routine to stop that is handling the cache +func (m *asgCache) Cleanup() { + close(m.interrupt) +} + +func (m *asgCache) regenerate() error { + newCache := make(map[azureRef]Asg) + + for _, nsg := range m.registeredAsgs { + instances, err := nsg.Nodes() + if err != nil { + return err + } + + for _, instance := range instances { + ref := azureRef{Name: instance} + newCache[ref] = nsg + } + } + + m.instanceToAsg = newCache + return nil +} From 6d65339956ceb91507c01a4daae4f2a136767306 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 15 Jan 2018 17:16:58 +0800 Subject: [PATCH 2/5] Fix azure client fakes --- .../cloudprovider/azure/azure_fakes.go | 79 +++++++++++++++---- 1 file changed, 65 insertions(+), 14 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_fakes.go b/cluster-autoscaler/cloudprovider/azure/azure_fakes.go index 7f418e2742b7..bb8b016b4764 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_fakes.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_fakes.go @@ -29,9 +29,15 @@ import ( "github.com/stretchr/testify/mock" ) +const ( + fakeVirtualMachineScaleSetVMID = "/subscriptions/test-subscrition-id/resourceGroups/test-asg/providers/Microsoft.Compute/virtualMachineScaleSets/agents/virtualMachines/0" +) + // VirtualMachineScaleSetsClientMock mocks for VirtualMachineScaleSetsClient. type VirtualMachineScaleSetsClientMock struct { mock.Mock + mutex sync.Mutex + FakeStore map[string]map[string]compute.VirtualMachineScaleSet } // Get gets the VirtualMachineScaleSet by vmScaleSetName. @@ -49,13 +55,31 @@ func (client *VirtualMachineScaleSetsClientMock) Get(resourceGroupName string, } // CreateOrUpdate creates or updates the VirtualMachineScaleSet. -func (client *VirtualMachineScaleSetsClientMock) CreateOrUpdate( - resourceGroupName string, vmScaleSetName string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) { - errChan := make(chan error) - go func() { - errChan <- nil +func (client *VirtualMachineScaleSetsClientMock) CreateOrUpdate(resourceGroupName string, VMScaleSetName string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) { + client.mutex.Lock() + defer client.mutex.Unlock() + + resultChan := make(chan compute.VirtualMachineScaleSet, 1) + errChan := make(chan error, 1) + var result compute.VirtualMachineScaleSet + var err error + defer func() { + resultChan <- result + errChan <- err + close(resultChan) + close(errChan) }() - return nil, errChan + + if _, ok := client.FakeStore[resourceGroupName]; !ok { + client.FakeStore[resourceGroupName] = make(map[string]compute.VirtualMachineScaleSet) + } + client.FakeStore[resourceGroupName][VMScaleSetName] = parameters + result = client.FakeStore[resourceGroupName][VMScaleSetName] + result.Response.Response = &http.Response{ + StatusCode: http.StatusOK, + } + err = nil + return resultChan, errChan } // DeleteInstances deletes a set of instances for specified VirtualMachineScaleSet. @@ -69,6 +93,31 @@ func (client *VirtualMachineScaleSetsClientMock) DeleteInstances(resourceGroupNa return nil, errChan } +// List get a list of VirtualMachineScaleSets. +func (client *VirtualMachineScaleSetsClientMock) List(resourceGroupName string) (result compute.VirtualMachineScaleSetListResult, err error) { + client.mutex.Lock() + defer client.mutex.Unlock() + + value := []compute.VirtualMachineScaleSet{} + if _, ok := client.FakeStore[resourceGroupName]; ok { + for _, v := range client.FakeStore[resourceGroupName] { + value = append(value, v) + } + } + + result.Response.Response = &http.Response{ + StatusCode: http.StatusOK, + } + result.NextLink = nil + result.Value = &value + return result, nil +} + +// ListNextResults gets more results of VirtualMachineScaleSets. +func (client *VirtualMachineScaleSetsClientMock) ListNextResults(lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error) { + return result, nil +} + // VirtualMachineScaleSetVMsClientMock mocks for VirtualMachineScaleSetVMsClient. type VirtualMachineScaleSetVMsClientMock struct { mock.Mock @@ -77,13 +126,15 @@ type VirtualMachineScaleSetVMsClientMock struct { // List gets a list of VirtualMachineScaleSetVMs. func (m *VirtualMachineScaleSetVMsClientMock) List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error) { value := make([]compute.VirtualMachineScaleSetVM, 1) - vmInstanceID := "test-instance-id" - properties := compute.VirtualMachineScaleSetVMProperties{} + ID := fakeVirtualMachineScaleSetVMID + instanceID := "0" vmID := "123E4567-E89B-12D3-A456-426655440000" - properties.VMID = &vmID + properties := compute.VirtualMachineScaleSetVMProperties{ + VMID: &vmID, + } value[0] = compute.VirtualMachineScaleSetVM{ - ID: &vmID, - InstanceID: &vmInstanceID, + ID: &ID, + InstanceID: &instanceID, VirtualMachineScaleSetVMProperties: &properties, } @@ -94,14 +145,14 @@ func (m *VirtualMachineScaleSetVMsClientMock) List(resourceGroupName string, vir // ListNextResults gets more results from previous VirtualMachineScaleSetVMListResult. func (m *VirtualMachineScaleSetVMsClientMock) ListNextResults(lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error) { - return result, nil + return compute.VirtualMachineScaleSetVMListResult{Value: nil}, nil } // VirtualMachinesClientMock mocks for VirtualMachinesClient. type VirtualMachinesClientMock struct { mock.Mock - mutex *sync.Mutex + mutex sync.Mutex FakeStore map[string]map[string]compute.VirtualMachine } @@ -228,7 +279,7 @@ func (m *AccountsClientMock) ListKeys(resourceGroupName string, accountName stri type DeploymentsClientMock struct { mock.Mock - mutex *sync.Mutex + mutex sync.Mutex FakeStore map[string]resources.DeploymentExtended } From b0c152a36e9d9b66c95a8ef2def3c5e494f7ba0a Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 15 Jan 2018 17:17:27 +0800 Subject: [PATCH 3/5] Wrap all clients to azClient --- .../cloudprovider/azure/azure_client.go | 200 ++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 cluster-autoscaler/cloudprovider/azure/azure_client.go diff --git a/cluster-autoscaler/cloudprovider/azure/azure_client.go b/cluster-autoscaler/cloudprovider/azure/azure_client.go new file mode 100644 index 000000000000..8a32095c2daa --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_client.go @@ -0,0 +1,200 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "io/ioutil" + "time" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/disk" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/azure-sdk-for-go/arm/resources/resources" + "github.com/Azure/azure-sdk-for-go/arm/storage" + "github.com/Azure/go-autorest/autorest" + "github.com/Azure/go-autorest/autorest/adal" + "github.com/Azure/go-autorest/autorest/azure" + "github.com/golang/glog" +) + +// VirtualMachineScaleSetsClient defines needed functions for azure compute.VirtualMachineScaleSetsClient. +type VirtualMachineScaleSetsClient interface { + Get(resourceGroupName string, vmScaleSetName string) (result compute.VirtualMachineScaleSet, err error) + CreateOrUpdate(resourceGroupName string, name string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) + DeleteInstances(resourceGroupName string, vmScaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) + List(resourceGroupName string) (result compute.VirtualMachineScaleSetListResult, err error) + ListNextResults(lastResults compute.VirtualMachineScaleSetListResult) (result compute.VirtualMachineScaleSetListResult, err error) +} + +// VirtualMachineScaleSetVMsClient defines needed functions for azure compute.VirtualMachineScaleSetVMsClient. +type VirtualMachineScaleSetVMsClient interface { + List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error) + ListNextResults(lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error) +} + +// VirtualMachinesClient defines needed functions for azure compute.VirtualMachinesClient. +type VirtualMachinesClient interface { + Get(resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err error) + Delete(resourceGroupName string, VMName string, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) + List(resourceGroupName string) (result compute.VirtualMachineListResult, err error) + ListNextResults(lastResults compute.VirtualMachineListResult) (result compute.VirtualMachineListResult, err error) +} + +// InterfacesClient defines needed functions for azure network.InterfacesClient. +type InterfacesClient interface { + Delete(resourceGroupName string, networkInterfaceName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) +} + +// DeploymentsClient defines needed functions for azure network.DeploymentsClient. +type DeploymentsClient interface { + Get(resourceGroupName string, deploymentName string) (result resources.DeploymentExtended, err error) + ExportTemplate(resourceGroupName string, deploymentName string) (result resources.DeploymentExportResult, err error) + CreateOrUpdate(resourceGroupName string, deploymentName string, parameters resources.Deployment, cancel <-chan struct{}) (<-chan resources.DeploymentExtended, <-chan error) +} + +// DisksClient defines needed functions for azure disk.DisksClient. +type DisksClient interface { + Delete(resourceGroupName string, diskName string, cancel <-chan struct{}) (<-chan disk.OperationStatusResponse, <-chan error) +} + +// AccountsClient defines needed functions for azure storage.AccountsClient. +type AccountsClient interface { + ListKeys(resourceGroupName string, accountName string) (result storage.AccountListKeysResult, err error) +} + +type azClient struct { + virtualMachineScaleSetsClient VirtualMachineScaleSetsClient + virtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient + virtualMachinesClient VirtualMachinesClient + deploymentsClient DeploymentsClient + interfacesClient InterfacesClient + disksClient DisksClient + storageAccountsClient AccountsClient +} + +// newServicePrincipalTokenFromCredentials creates a new ServicePrincipalToken using values of the +// passed credentials map. +func newServicePrincipalTokenFromCredentials(config *Config, env *azure.Environment) (*adal.ServicePrincipalToken, error) { + oauthConfig, err := adal.NewOAuthConfig(env.ActiveDirectoryEndpoint, config.TenantID) + if err != nil { + return nil, fmt.Errorf("creating the OAuth config: %v", err) + } + + if config.UseManagedIdentityExtension { + glog.V(2).Infoln("azure: using managed identity extension to retrieve access token") + msiEndpoint, err := adal.GetMSIVMEndpoint() + if err != nil { + return nil, fmt.Errorf("Getting the managed service identity endpoint: %v", err) + } + return adal.NewServicePrincipalTokenFromMSI( + msiEndpoint, + env.ServiceManagementEndpoint) + } + + if len(config.AADClientSecret) > 0 { + glog.V(2).Infoln("azure: using client_id+client_secret to retrieve access token") + return adal.NewServicePrincipalToken( + *oauthConfig, + config.AADClientID, + config.AADClientSecret, + env.ServiceManagementEndpoint) + } + + if len(config.AADClientCertPath) > 0 && len(config.AADClientCertPassword) > 0 { + glog.V(2).Infoln("azure: using jwt client_assertion (client_cert+client_private_key) to retrieve access token") + certData, err := ioutil.ReadFile(config.AADClientCertPath) + if err != nil { + return nil, fmt.Errorf("reading the client certificate from file %s: %v", config.AADClientCertPath, err) + } + certificate, privateKey, err := decodePkcs12(certData, config.AADClientCertPassword) + if err != nil { + return nil, fmt.Errorf("decoding the client certificate: %v", err) + } + return adal.NewServicePrincipalTokenFromCertificate( + *oauthConfig, + config.AADClientID, + certificate, + privateKey, + env.ServiceManagementEndpoint) + } + + return nil, fmt.Errorf("No credentials provided for AAD application %s", config.AADClientID) +} + +func newAzClient(cfg *Config, env *azure.Environment) (*azClient, error) { + spt, err := newServicePrincipalTokenFromCredentials(cfg, env) + if err != nil { + return nil, err + } + + scaleSetsClient := compute.NewVirtualMachineScaleSetsClient(cfg.SubscriptionID) + scaleSetsClient.BaseURI = env.ResourceManagerEndpoint + scaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(spt) + scaleSetsClient.PollingDelay = 5 * time.Second + configureUserAgent(&scaleSetsClient.Client) + glog.V(5).Infof("Created scale set client with authorizer: %v", scaleSetsClient) + + scaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(cfg.SubscriptionID) + scaleSetVMsClient.BaseURI = env.ResourceManagerEndpoint + scaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(spt) + scaleSetVMsClient.PollingDelay = 5 * time.Second + configureUserAgent(&scaleSetVMsClient.Client) + glog.V(5).Infof("Created scale set vm client with authorizer: %v", scaleSetVMsClient) + + virtualMachinesClient := compute.NewVirtualMachinesClient(cfg.SubscriptionID) + virtualMachinesClient.BaseURI = env.ResourceManagerEndpoint + virtualMachinesClient.Authorizer = autorest.NewBearerAuthorizer(spt) + virtualMachinesClient.PollingDelay = 5 * time.Second + configureUserAgent(&virtualMachinesClient.Client) + glog.V(5).Infof("Created vm client with authorizer: %v", virtualMachinesClient) + + deploymentsClient := resources.NewDeploymentsClient(cfg.SubscriptionID) + deploymentsClient.BaseURI = env.ResourceManagerEndpoint + deploymentsClient.Authorizer = autorest.NewBearerAuthorizer(spt) + deploymentsClient.PollingDelay = 5 * time.Second + configureUserAgent(&deploymentsClient.Client) + glog.V(5).Infof("Created deployments client with authorizer: %v", deploymentsClient) + + interfacesClient := network.NewInterfacesClient(cfg.SubscriptionID) + interfacesClient.BaseURI = env.ResourceManagerEndpoint + interfacesClient.Authorizer = autorest.NewBearerAuthorizer(spt) + interfacesClient.PollingDelay = 5 * time.Second + glog.V(5).Infof("Created interfaces client with authorizer: %v", interfacesClient) + + storageAccountsClient := storage.NewAccountsClient(cfg.SubscriptionID) + storageAccountsClient.BaseURI = env.ResourceManagerEndpoint + storageAccountsClient.Authorizer = autorest.NewBearerAuthorizer(spt) + storageAccountsClient.PollingDelay = 5 * time.Second + glog.V(5).Infof("Created storage accounts client with authorizer: %v", storageAccountsClient) + + disksClient := disk.NewDisksClient(cfg.SubscriptionID) + disksClient.BaseURI = env.ResourceManagerEndpoint + disksClient.Authorizer = autorest.NewBearerAuthorizer(spt) + disksClient.PollingDelay = 5 * time.Second + glog.V(5).Infof("Created disks client with authorizer: %v", disksClient) + + return &azClient{ + disksClient: disksClient, + interfacesClient: interfacesClient, + virtualMachineScaleSetsClient: scaleSetsClient, + virtualMachineScaleSetVMsClient: scaleSetVMsClient, + deploymentsClient: deploymentsClient, + virtualMachinesClient: virtualMachinesClient, + storageAccountsClient: storageAccountsClient, + }, nil +} From 1499b5b51a25ba9918c82053ce3e310ba298e47f Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 15 Jan 2018 17:18:27 +0800 Subject: [PATCH 4/5] Add support for multipe node groups in Azure --- .../cloudprovider/azure/azure_agent_pool.go | 121 ++-- .../azure/azure_cloud_provider.go | 85 +-- .../cloudprovider/azure/azure_manager.go | 546 +++++++----------- .../cloudprovider/azure/azure_scale_set.go | 115 ++-- .../cloudprovider/azure/azure_util.go | 62 ++ .../builder/cloud_provider_builder.go | 4 +- .../node_group_discovery_options.go | 54 +- 7 files changed, 478 insertions(+), 509 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go index 241cade519a4..3cba1b04eac3 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go @@ -32,13 +32,14 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // AgentPool implements NodeGroup interface for agent pools deployed by acs-engine. type AgentPool struct { - AzureRef - *AzureManager + azureRef + manager *AzureManager minSize int maxSize int @@ -58,15 +59,15 @@ type VirtualMachineID struct { } // NewAgentPool creates a new AgentPool. -func NewAgentPool(name string, minSize, maxSize int, az *AzureManager) (*AgentPool, error) { +func NewAgentPool(spec *dynamic.NodeGroupSpec, az *AzureManager) (*AgentPool, error) { as := &AgentPool{ - AzureRef: AzureRef{ - Name: name, + azureRef: azureRef{ + Name: spec.Name, }, - minSize: minSize, - maxSize: maxSize, - targetSize: -1, - AzureManager: az, + minSize: spec.MinSize, + maxSize: spec.MaxSize, + targetSize: -1, + manager: az, } if err := as.initialize(); err != nil { @@ -77,15 +78,15 @@ func NewAgentPool(name string, minSize, maxSize int, az *AzureManager) (*AgentPo } func (as *AgentPool) initialize() error { - deploy, err := as.deploymentsClient.Get(as.config.ResourceGroup, as.config.Deployment) + deploy, err := as.manager.azClient.deploymentsClient.Get(as.manager.config.ResourceGroup, as.manager.config.Deployment) if err != nil { - glog.Errorf("deploymentsClient.Get(%s, %s) failed: %v", as.config.ResourceGroup, as.config.Deployment, err) + glog.Errorf("deploymentsClient.Get(%s, %s) failed: %v", as.manager.config.ResourceGroup, as.manager.config.Deployment, err) return err } - template, err := as.deploymentsClient.ExportTemplate(as.config.ResourceGroup, as.config.Deployment) + template, err := as.manager.azClient.deploymentsClient.ExportTemplate(as.manager.config.ResourceGroup, as.manager.config.Deployment) if err != nil { - glog.Errorf("deploymentsClient.ExportTemplate(%s, %s) failed: %v", as.config.ResourceGroup, as.config.Deployment, err) + glog.Errorf("deploymentsClient.ExportTemplate(%s, %s) failed: %v", as.manager.config.ResourceGroup, as.manager.config.Deployment, err) return err } @@ -105,14 +106,14 @@ func (as *AgentPool) preprocessParameters() { } // fulfill secure parameters. - as.parameters["apiServerPrivateKey"] = map[string]string{"value": as.config.APIServerPrivateKey} - as.parameters["caPrivateKey"] = map[string]string{"value": as.config.CAPrivateKey} - as.parameters["clientPrivateKey"] = map[string]string{"value": as.config.ClientPrivateKey} - as.parameters["kubeConfigPrivateKey"] = map[string]string{"value": as.config.KubeConfigPrivateKey} - as.parameters["servicePrincipalClientId"] = map[string]string{"value": as.config.AADClientID} - as.parameters["servicePrincipalClientSecret"] = map[string]string{"value": as.config.AADClientSecret} - if as.config.WindowsAdminPassword != "" { - as.parameters["windowsAdminPassword"] = map[string]string{"value": as.config.WindowsAdminPassword} + as.parameters["apiServerPrivateKey"] = map[string]string{"value": as.manager.config.APIServerPrivateKey} + as.parameters["caPrivateKey"] = map[string]string{"value": as.manager.config.CAPrivateKey} + as.parameters["clientPrivateKey"] = map[string]string{"value": as.manager.config.ClientPrivateKey} + as.parameters["kubeConfigPrivateKey"] = map[string]string{"value": as.manager.config.KubeConfigPrivateKey} + as.parameters["servicePrincipalClientId"] = map[string]string{"value": as.manager.config.AADClientID} + as.parameters["servicePrincipalClientSecret"] = map[string]string{"value": as.manager.config.AADClientSecret} + if as.manager.config.WindowsAdminPassword != "" { + as.parameters["windowsAdminPassword"] = map[string]string{"value": as.manager.config.WindowsAdminPassword} } } @@ -231,14 +232,14 @@ func (as *AgentPool) IncreaseSize(delta int) error { Mode: resources.Incremental, }, } - _, errChan := as.deploymentsClient.CreateOrUpdate(as.config.ResourceGroup, newDeploymentName, newDeployment, cancel) - glog.V(3).Infof("Waiting for deploymentsClient.CreateOrUpdate(%s, %s, %s)", as.config.ResourceGroup, newDeploymentName, newDeployment) + _, errChan := as.manager.azClient.deploymentsClient.CreateOrUpdate(as.manager.config.ResourceGroup, newDeploymentName, newDeployment, cancel) + glog.V(3).Infof("Waiting for deploymentsClient.CreateOrUpdate(%s, %s, %s)", as.manager.config.ResourceGroup, newDeploymentName, newDeployment) return <-errChan } // GetVirtualMachines returns list of nodes for the given agent pool. func (as *AgentPool) GetVirtualMachines() (instances []compute.VirtualMachine, err error) { - result, err := as.virtualMachinesClient.List(as.config.ResourceGroup) + result, err := as.manager.azClient.virtualMachinesClient.List(as.manager.config.ResourceGroup) if err != nil { return nil, err } @@ -261,7 +262,7 @@ func (as *AgentPool) GetVirtualMachines() (instances []compute.VirtualMachine, e moreResult = false if result.NextLink != nil { - result, err = as.virtualMachinesClient.ListNextResults(result) + result, err = as.manager.azClient.virtualMachinesClient.ListNextResults(result) if err != nil { glog.Errorf("virtualMachinesClient.ListNextResults failed: %v", err) return nil, err @@ -302,11 +303,11 @@ func (as *AgentPool) DecreaseTargetSize(delta int) error { func (as *AgentPool) Belongs(node *apiv1.Node) (bool, error) { glog.V(6).Infof("Check if node belongs to this agent pool: AgentPool:%v, node:%v\n", as, node) - ref := &AzureRef{ + ref := &azureRef{ Name: strings.ToLower(node.Spec.ProviderID), } - targetAsg, err := as.GetNodeGroupForInstance(ref) + targetAsg, err := as.manager.GetAsgForInstance(ref) if err != nil { return false, err } @@ -320,18 +321,18 @@ func (as *AgentPool) Belongs(node *apiv1.Node) (bool, error) { } // DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. -func (as *AgentPool) DeleteInstances(instances []*AzureRef) error { +func (as *AgentPool) DeleteInstances(instances []*azureRef) error { if len(instances) == 0 { return nil } - commonAsg, err := as.GetNodeGroupForInstance(instances[0]) + commonAsg, err := as.manager.GetAsgForInstance(instances[0]) if err != nil { return err } for _, instance := range instances { - asg, err := as.GetNodeGroupForInstance(instance) + asg, err := as.manager.GetAsgForInstance(instance) if err != nil { return err } @@ -370,7 +371,7 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error { return fmt.Errorf("min size reached, nodes will not be deleted") } - refs := make([]*AzureRef, 0, len(nodes)) + refs := make([]*azureRef, 0, len(nodes)) for _, node := range nodes { belongs, err := as.Belongs(node) if err != nil { @@ -381,10 +382,10 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error { return fmt.Errorf("%s belongs to a different asg than %s", node.Name, as.Id()) } - azureRef := &AzureRef{ + ref := &azureRef{ Name: strings.ToLower(node.Spec.ProviderID), } - refs = append(refs, azureRef) + refs = append(refs, ref) } return as.DeleteInstances(refs) @@ -423,13 +424,13 @@ func (as *AgentPool) Nodes() ([]string, error) { } func (as *AgentPool) deleteBlob(accountName, vhdContainer, vhdBlob string) error { - storageKeysResult, err := as.storageAccountsClient.ListKeys(as.config.ResourceGroup, accountName) + storageKeysResult, err := as.manager.azClient.storageAccountsClient.ListKeys(as.manager.config.ResourceGroup, accountName) if err != nil { return err } keys := *storageKeysResult.Keys - client, err := azStorage.NewBasicClientOnSovereignCloud(accountName, to.String(keys[0].Value), as.env) + client, err := azStorage.NewBasicClientOnSovereignCloud(accountName, to.String(keys[0].Value), as.manager.env) if err != nil { return err } @@ -443,16 +444,16 @@ func (as *AgentPool) deleteBlob(accountName, vhdContainer, vhdBlob string) error // deleteVirtualMachine deletes a VM and any associated OS disk func (as *AgentPool) deleteVirtualMachine(name string) error { - vm, err := as.virtualMachinesClient.Get(as.config.ResourceGroup, name, "") + vm, err := as.manager.azClient.virtualMachinesClient.Get(as.manager.config.ResourceGroup, name, "") if err != nil { - glog.Errorf("failed to get VM: %s/%s: %s", as.config.ResourceGroup, name, err.Error()) + glog.Errorf("failed to get VM: %s/%s: %s", as.manager.config.ResourceGroup, name, err.Error()) return err } vhd := vm.VirtualMachineProperties.StorageProfile.OsDisk.Vhd managedDisk := vm.VirtualMachineProperties.StorageProfile.OsDisk.ManagedDisk if vhd == nil && managedDisk == nil { - glog.Errorf("failed to get a valid os disk URI for VM: %s/%s", as.config.ResourceGroup, name) + glog.Errorf("failed to get a valid os disk URI for VM: %s/%s", as.manager.config.ResourceGroup, name) return fmt.Errorf("os disk does not have a VHD URI") } @@ -460,25 +461,25 @@ func (as *AgentPool) deleteVirtualMachine(name string) error { var nicName string nicID := (*vm.VirtualMachineProperties.NetworkProfile.NetworkInterfaces)[0].ID if nicID == nil { - glog.Warningf("NIC ID is not set for VM (%s/%s)", as.config.ResourceGroup, name) + glog.Warningf("NIC ID is not set for VM (%s/%s)", as.manager.config.ResourceGroup, name) } else { nicName, err = resourceName(*nicID) if err != nil { return err } - glog.Infof("found nic name for VM (%s/%s): %s", as.config.ResourceGroup, name, nicName) + glog.Infof("found nic name for VM (%s/%s): %s", as.manager.config.ResourceGroup, name, nicName) } - glog.Infof("deleting VM: %s/%s", as.config.ResourceGroup, name) - _, deleteErrChan := as.virtualMachinesClient.Delete(as.config.ResourceGroup, name, nil) - glog.Infof("waiting for vm deletion: %s/%s", as.config.ResourceGroup, name) + glog.Infof("deleting VM: %s/%s", as.manager.config.ResourceGroup, name) + _, deleteErrChan := as.manager.azClient.virtualMachinesClient.Delete(as.manager.config.ResourceGroup, name, nil) + glog.Infof("waiting for vm deletion: %s/%s", as.manager.config.ResourceGroup, name) if err := <-deleteErrChan; err != nil { return err } if len(nicName) > 0 { - glog.Infof("deleting nic: %s/%s", as.config.ResourceGroup, nicName) - _, nicErrChan := as.interfacesClient.Delete(as.config.ResourceGroup, nicName, nil) - glog.Infof("waiting for nic deletion: %s/%s", as.config.ResourceGroup, nicName) + glog.Infof("deleting nic: %s/%s", as.manager.config.ResourceGroup, nicName) + _, nicErrChan := as.manager.azClient.interfacesClient.Delete(as.manager.config.ResourceGroup, nicName, nil) + glog.Infof("waiting for nic deletion: %s/%s", as.manager.config.ResourceGroup, nicName) if nicErr := <-nicErrChan; nicErr != nil { return nicErr } @@ -498,10 +499,10 @@ func (as *AgentPool) deleteVirtualMachine(name string) error { } } else if managedDisk != nil { if osDiskName == nil { - glog.Warningf("osDisk is not set for VM %s/%s", as.config.ResourceGroup, name) + glog.Warningf("osDisk is not set for VM %s/%s", as.manager.config.ResourceGroup, name) } else { - glog.Infof("deleting managed disk: %s/%s", as.config.ResourceGroup, *osDiskName) - _, diskErrChan := as.disksClient.Delete(as.config.ResourceGroup, *osDiskName, nil) + glog.Infof("deleting managed disk: %s/%s", as.manager.config.ResourceGroup, *osDiskName) + _, diskErrChan := as.manager.azClient.disksClient.Delete(as.manager.config.ResourceGroup, *osDiskName, nil) if err := <-diskErrChan; err != nil { return err @@ -511,3 +512,25 @@ func (as *AgentPool) deleteVirtualMachine(name string) error { return nil } + +// getAzureRef gets AzureRef fot the as. +func (as *AgentPool) getAzureRef() azureRef { + return as.azureRef +} + +func (as *AgentPool) getInstanceIDs() (map[azureRef]string, error) { + _, indexToVM, err := as.GetVMIndexes() + if err != nil { + return nil, err + } + + result := make(map[azureRef]string) + for i, vm := range indexToVM { + ref := azureRef{ + Name: vm.ID, + } + result[ref] = fmt.Sprintf("%d", i) + } + + return result, nil +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go index 1112a51399e6..2dca38f85894 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider.go @@ -17,8 +17,6 @@ limitations under the License. package azure import ( - "fmt" - "strconv" "strings" "github.com/golang/glog" @@ -37,21 +35,15 @@ const ( // AzureCloudProvider provides implementation of CloudProvider interface for Azure. type AzureCloudProvider struct { azureManager *AzureManager - nodeGroups []cloudprovider.NodeGroup resourceLimiter *cloudprovider.ResourceLimiter } // BuildAzureCloudProvider creates new AzureCloudProvider -func BuildAzureCloudProvider(azureManager *AzureManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*AzureCloudProvider, error) { +func BuildAzureCloudProvider(azureManager *AzureManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { azure := &AzureCloudProvider{ azureManager: azureManager, resourceLimiter: resourceLimiter, } - for _, spec := range specs { - if err := azure.addNodeGroup(spec); err != nil { - return nil, err - } - } return azure, nil } @@ -62,19 +54,6 @@ func (azure *AzureCloudProvider) Cleanup() error { return nil } -// addNodeGroup adds node group defined in string spec. Format: -// minNodes:maxNodes:scaleSetName -func (azure *AzureCloudProvider) addNodeGroup(spec string) error { - nodeGroup, err := azure.buildNodeGroup(spec) - if err != nil { - return err - } - - azure.nodeGroups = append(azure.nodeGroups, nodeGroup) - azure.azureManager.RegisterNodeGroup(nodeGroup) - return nil -} - // Name returns name of the cloud provider. func (azure *AzureCloudProvider) Name() string { return "azure" @@ -82,21 +61,23 @@ func (azure *AzureCloudProvider) Name() string { // NodeGroups returns all node groups configured for this cloud provider. func (azure *AzureCloudProvider) NodeGroups() []cloudprovider.NodeGroup { - result := make([]cloudprovider.NodeGroup, 0, len(azure.nodeGroups)) - for _, nodeGroup := range azure.nodeGroups { - result = append(result, nodeGroup) + asgs := azure.azureManager.getAsgs() + + ngs := make([]cloudprovider.NodeGroup, len(asgs)) + for i, asg := range asgs { + ngs[i] = asg } - return result + return ngs } // NodeGroupForNode returns the node group for the given node. func (azure *AzureCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) { glog.V(6).Infof("Searching for node group for the node: %s, %s\n", node.Spec.ExternalID, node.Spec.ProviderID) - ref := &AzureRef{ + ref := &azureRef{ Name: strings.ToLower(node.Spec.ProviderID), } - return azure.azureManager.GetNodeGroupForInstance(ref) + return azure.azureManager.GetAsgForInstance(ref) } // Pricing returns pricing model for this cloud provider or error if not available. @@ -123,55 +104,15 @@ func (azure *AzureCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLi // Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). func (azure *AzureCloudProvider) Refresh() error { - return nil -} - -// Create nodeGroup from provided spec. -// spec is in the following format: min-size:max-size:scale-set-name. -func (azure *AzureCloudProvider) buildNodeGroup(spec string) (cloudprovider.NodeGroup, error) { - tokens := strings.SplitN(spec, ":", 3) - if len(tokens) != 3 { - return nil, fmt.Errorf("wrong nodes configuration: %s", spec) - } - - minSize := 0 - maxSize := 0 - name := tokens[2] - if size, err := strconv.Atoi(tokens[0]); err == nil { - if size <= 0 { - return nil, fmt.Errorf("min size must be >= 1, got: %d", size) - } - minSize = size - } else { - return nil, fmt.Errorf("failed to set min size: %s, expected integer", tokens[0]) - } - - if size, err := strconv.Atoi(tokens[1]); err == nil { - if size < minSize { - return nil, fmt.Errorf("max size must be greater or equal to min size") - } - maxSize = size - } else { - return nil, fmt.Errorf("failed to set max size: %s, expected integer", tokens[1]) - } - - if tokens[2] == "" { - return nil, fmt.Errorf("scale set name must not be blank, got spec: %s", spec) - } - - if azure.azureManager.config.VMType == vmTypeStandard { - return NewAgentPool(name, minSize, maxSize, azure.azureManager) - } - - return NewScaleSet(name, minSize, maxSize, azure.azureManager) + return azure.azureManager.Refresh() } -// AzureRef contains a reference to some entity in Azure world. -type AzureRef struct { +// azureRef contains a reference to some entity in Azure world. +type azureRef struct { Name string } // GetKey returns key of the given azure reference. -func (m *AzureRef) GetKey() string { +func (m *azureRef) GetKey() string { return m.Name } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_manager.go b/cluster-autoscaler/cloudprovider/azure/azure_manager.go index 5a29b06e9ab7..bfac02311efd 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_manager.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_manager.go @@ -19,97 +19,37 @@ package azure import ( "fmt" "io" - "io/ioutil" "os" "strconv" "strings" - "sync" "time" - "github.com/Azure/azure-sdk-for-go/arm/compute" - "github.com/Azure/azure-sdk-for-go/arm/disk" - "github.com/Azure/azure-sdk-for-go/arm/network" - "github.com/Azure/azure-sdk-for-go/arm/resources/resources" - "github.com/Azure/azure-sdk-for-go/arm/storage" - "github.com/Azure/go-autorest/autorest" - "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" "github.com/golang/glog" "gopkg.in/gcfg.v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" ) const ( vmTypeVMSS = "vmss" vmTypeStandard = "standard" -) - -// VirtualMachineScaleSetsClient defines needed functions for azure compute.VirtualMachineScaleSetsClient. -type VirtualMachineScaleSetsClient interface { - Get(resourceGroupName string, vmScaleSetName string) (result compute.VirtualMachineScaleSet, err error) - CreateOrUpdate(resourceGroupName string, name string, parameters compute.VirtualMachineScaleSet, cancel <-chan struct{}) (<-chan compute.VirtualMachineScaleSet, <-chan error) - DeleteInstances(resourceGroupName string, vmScaleSetName string, vmInstanceIDs compute.VirtualMachineScaleSetVMInstanceRequiredIDs, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) -} - -// VirtualMachineScaleSetVMsClient defines needed functions for azure compute.VirtualMachineScaleSetVMsClient. -type VirtualMachineScaleSetVMsClient interface { - List(resourceGroupName string, virtualMachineScaleSetName string, filter string, selectParameter string, expand string) (result compute.VirtualMachineScaleSetVMListResult, err error) - ListNextResults(lastResults compute.VirtualMachineScaleSetVMListResult) (result compute.VirtualMachineScaleSetVMListResult, err error) -} - -// VirtualMachinesClient defines needed functions for azure compute.VirtualMachinesClient. -type VirtualMachinesClient interface { - Get(resourceGroupName string, VMName string, expand compute.InstanceViewTypes) (result compute.VirtualMachine, err error) - Delete(resourceGroupName string, VMName string, cancel <-chan struct{}) (<-chan compute.OperationStatusResponse, <-chan error) - List(resourceGroupName string) (result compute.VirtualMachineListResult, err error) - ListNextResults(lastResults compute.VirtualMachineListResult) (result compute.VirtualMachineListResult, err error) -} - -// InterfacesClient defines needed functions for azure network.InterfacesClient. -type InterfacesClient interface { - Delete(resourceGroupName string, networkInterfaceName string, cancel <-chan struct{}) (<-chan autorest.Response, <-chan error) -} - -// DeploymentsClient defines needed functions for azure network.DeploymentsClient. -type DeploymentsClient interface { - Get(resourceGroupName string, deploymentName string) (result resources.DeploymentExtended, err error) - ExportTemplate(resourceGroupName string, deploymentName string) (result resources.DeploymentExportResult, err error) - CreateOrUpdate(resourceGroupName string, deploymentName string, parameters resources.Deployment, cancel <-chan struct{}) (<-chan resources.DeploymentExtended, <-chan error) -} - -// DisksClient defines needed functions for azure disk.DisksClient. -type DisksClient interface { - Delete(resourceGroupName string, diskName string, cancel <-chan struct{}) (<-chan disk.OperationStatusResponse, <-chan error) -} -// AccountsClient defines needed functions for azure storage.AccountsClient. -type AccountsClient interface { - ListKeys(resourceGroupName string, accountName string) (result storage.AccountListKeysResult, err error) -} + scaleToZeroSupported = false + refreshInterval = 1 * time.Minute +) // AzureManager handles Azure communication and data caching. type AzureManager struct { - config *Config - env azure.Environment - - virtualMachineScaleSetsClient VirtualMachineScaleSetsClient - virtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient - virtualMachinesClient VirtualMachinesClient - deploymentsClient DeploymentsClient - interfacesClient InterfacesClient - disksClient DisksClient - storageAccountsClient AccountsClient - - nodeGroups []cloudprovider.NodeGroup - // cache of mapping from instance name to nodeGroup. - nodeGroupsCache map[AzureRef]cloudprovider.NodeGroup - // cache of mapping from instance name to instanceID. - instanceIDsCache map[string]string - - cacheMutex sync.Mutex - interrupt chan struct{} + config *Config + azClient *azClient + env azure.Environment + + asgCache *asgCache + lastRefresh time.Time + asgAutoDiscoverySpecs []cloudprovider.LabelAutoDiscoveryConfig + explicitlyConfigured map[azureRef]bool } // Config holds the configuration parsed from the --cloud-config flag @@ -155,7 +95,7 @@ func (c *Config) TrimSpace() { } // CreateAzureManager creates Azure Manager object to work with Azure. -func CreateAzureManager(configReader io.Reader) (*AzureManager, error) { +func CreateAzureManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*AzureManager, error) { var err error var cfg Config @@ -196,6 +136,7 @@ func CreateAzureManager(configReader io.Reader) (*AzureManager, error) { cfg.VMType = vmTypeVMSS } + // Defaulting env to Azure Public Cloud. env := azure.PublicCloud if cfg.Cloud != "" { env, err = azure.EnvironmentFromName(cfg.Cloud) @@ -204,355 +145,268 @@ func CreateAzureManager(configReader io.Reader) (*AzureManager, error) { } } - if cfg.ResourceGroup == "" { - return nil, fmt.Errorf("resource group not set") - } - - if cfg.SubscriptionID == "" { - return nil, fmt.Errorf("subscription ID not set") - } - - if cfg.TenantID == "" { - return nil, fmt.Errorf("tenant ID not set") - } - - if cfg.AADClientID == "" { - return nil, fmt.Errorf("ARM Client ID not set") - } - - if cfg.VMType == vmTypeStandard { - if cfg.Deployment == "" { - return nil, fmt.Errorf("deployment not set") - } - - if cfg.APIServerPrivateKey == "" { - return nil, fmt.Errorf("apiServerPrivateKey not set") - } - - if cfg.CAPrivateKey == "" { - return nil, fmt.Errorf("caPrivateKey not set") - } - - if cfg.ClientPrivateKey == "" { - return nil, fmt.Errorf("clientPrivateKey not set") - } - - if cfg.KubeConfigPrivateKey == "" { - return nil, fmt.Errorf("kubeConfigPrivateKey not set") - } + if err := validateConfig(&cfg); err != nil { + return nil, err } glog.Infof("Starting azure manager with subscription ID %q", cfg.SubscriptionID) - spt, err := NewServicePrincipalTokenFromCredentials(&cfg, &env) + azClient, err := newAzClient(&cfg, &env) if err != nil { return nil, err } - scaleSetsClient := compute.NewVirtualMachineScaleSetsClient(cfg.SubscriptionID) - scaleSetsClient.BaseURI = env.ResourceManagerEndpoint - scaleSetsClient.Authorizer = autorest.NewBearerAuthorizer(spt) - scaleSetsClient.PollingDelay = 5 * time.Second - configureUserAgent(&scaleSetsClient.Client) - glog.V(5).Infof("Created scale set client with authorizer: %v", scaleSetsClient) - - scaleSetVMsClient := compute.NewVirtualMachineScaleSetVMsClient(cfg.SubscriptionID) - scaleSetVMsClient.BaseURI = env.ResourceManagerEndpoint - scaleSetVMsClient.Authorizer = autorest.NewBearerAuthorizer(spt) - scaleSetVMsClient.PollingDelay = 5 * time.Second - configureUserAgent(&scaleSetVMsClient.Client) - glog.V(5).Infof("Created scale set vm client with authorizer: %v", scaleSetVMsClient) - - virtualMachinesClient := compute.NewVirtualMachinesClient(cfg.SubscriptionID) - virtualMachinesClient.BaseURI = env.ResourceManagerEndpoint - virtualMachinesClient.Authorizer = autorest.NewBearerAuthorizer(spt) - virtualMachinesClient.PollingDelay = 5 * time.Second - configureUserAgent(&virtualMachinesClient.Client) - glog.V(5).Infof("Created vm client with authorizer: %v", virtualMachinesClient) - - deploymentsClient := resources.NewDeploymentsClient(cfg.SubscriptionID) - deploymentsClient.BaseURI = env.ResourceManagerEndpoint - deploymentsClient.Authorizer = autorest.NewBearerAuthorizer(spt) - deploymentsClient.PollingDelay = 5 * time.Second - configureUserAgent(&deploymentsClient.Client) - glog.V(5).Infof("Created deployments client with authorizer: %v", deploymentsClient) - - interfacesClient := network.NewInterfacesClient(cfg.SubscriptionID) - interfacesClient.BaseURI = env.ResourceManagerEndpoint - interfacesClient.Authorizer = autorest.NewBearerAuthorizer(spt) - interfacesClient.PollingDelay = 5 * time.Second - glog.V(5).Infof("Created interfaces client with authorizer: %v", interfacesClient) - - storageAccountsClient := storage.NewAccountsClient(cfg.SubscriptionID) - storageAccountsClient.BaseURI = env.ResourceManagerEndpoint - storageAccountsClient.Authorizer = autorest.NewBearerAuthorizer(spt) - storageAccountsClient.PollingDelay = 5 * time.Second - glog.V(5).Infof("Created storage accounts client with authorizer: %v", storageAccountsClient) - - disksClient := disk.NewDisksClient(cfg.SubscriptionID) - disksClient.BaseURI = env.ResourceManagerEndpoint - disksClient.Authorizer = autorest.NewBearerAuthorizer(spt) - disksClient.PollingDelay = 5 * time.Second - glog.V(5).Infof("Created disks client with authorizer: %v", disksClient) - // Create azure manager. manager := &AzureManager{ - config: &cfg, - env: env, - disksClient: disksClient, - interfacesClient: interfacesClient, - virtualMachineScaleSetsClient: scaleSetsClient, - virtualMachineScaleSetVMsClient: scaleSetVMsClient, - deploymentsClient: deploymentsClient, - virtualMachinesClient: virtualMachinesClient, - storageAccountsClient: storageAccountsClient, - - interrupt: make(chan struct{}), - instanceIDsCache: make(map[string]string), - nodeGroups: make([]cloudprovider.NodeGroup, 0), - nodeGroupsCache: make(map[AzureRef]cloudprovider.NodeGroup), + config: &cfg, + env: env, + azClient: azClient, + explicitlyConfigured: make(map[azureRef]bool), } - go wait.Until(func() { - manager.cacheMutex.Lock() - defer manager.cacheMutex.Unlock() - if err := manager.regenerateCache(); err != nil { - glog.Errorf("Error while regenerating AS cache: %v", err) - } - }, 5*time.Minute, manager.interrupt) - - return manager, nil -} + cache, err := newAsgCache() + if err != nil { + return nil, err + } + manager.asgCache = cache -// NewServicePrincipalTokenFromCredentials creates a new ServicePrincipalToken using values of the -// passed credentials map. -func NewServicePrincipalTokenFromCredentials(config *Config, env *azure.Environment) (*adal.ServicePrincipalToken, error) { - oauthConfig, err := adal.NewOAuthConfig(env.ActiveDirectoryEndpoint, config.TenantID) + specs, err := discoveryOpts.ParseLabelAutoDiscoverySpecs() if err != nil { - return nil, fmt.Errorf("creating the OAuth config: %v", err) + return nil, err } + manager.asgAutoDiscoverySpecs = specs - if config.UseManagedIdentityExtension { - glog.V(2).Infoln("azure: using managed identity extension to retrieve access token") - msiEndpoint, err := adal.GetMSIVMEndpoint() - if err != nil { - return nil, fmt.Errorf("Getting the managed service identity endpoint: %v", err) - } - return adal.NewServicePrincipalTokenFromMSI( - msiEndpoint, - env.ServiceManagementEndpoint) + if err := manager.fetchExplicitAsgs(discoveryOpts.NodeGroupSpecs); err != nil { + return nil, err } - if len(config.AADClientSecret) > 0 { - glog.V(2).Infoln("azure: using client_id+client_secret to retrieve access token") - return adal.NewServicePrincipalToken( - *oauthConfig, - config.AADClientID, - config.AADClientSecret, - env.ServiceManagementEndpoint) + if err := manager.forceRefresh(); err != nil { + return nil, err } - if len(config.AADClientCertPath) > 0 && len(config.AADClientCertPassword) > 0 { - glog.V(2).Infoln("azure: using jwt client_assertion (client_cert+client_private_key) to retrieve access token") - certData, err := ioutil.ReadFile(config.AADClientCertPath) + return manager, nil +} + +func (m *AzureManager) fetchExplicitAsgs(specs []string) error { + changed := false + for _, spec := range specs { + asg, err := m.buildAsgFromSpec(spec) if err != nil { - return nil, fmt.Errorf("reading the client certificate from file %s: %v", config.AADClientCertPath, err) + return fmt.Errorf("failed to parse node group spec: %v", err) } - certificate, privateKey, err := decodePkcs12(certData, config.AADClientCertPassword) - if err != nil { - return nil, fmt.Errorf("decoding the client certificate: %v", err) + if m.RegisterAsg(asg) { + changed = true } - return adal.NewServicePrincipalTokenFromCertificate( - *oauthConfig, - config.AADClientID, - certificate, - privateKey, - env.ServiceManagementEndpoint) + m.explicitlyConfigured[asg.getAzureRef()] = true } - return nil, fmt.Errorf("No credentials provided for AAD application %s", config.AADClientID) + if changed { + if err := m.regenerateCache(); err != nil { + return err + } + } + return nil } -// RegisterNodeGroup registers node group in Azure Manager. -func (m *AzureManager) RegisterNodeGroup(nodeGroup cloudprovider.NodeGroup) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() +func (m *AzureManager) buildAsgFromSpec(spec string) (Asg, error) { + s, err := dynamic.SpecFromString(spec, scaleToZeroSupported) + if err != nil { + return nil, fmt.Errorf("failed to parse node group spec: %v", err) + } - m.nodeGroups = append(m.nodeGroups, nodeGroup) + switch m.config.VMType { + case vmTypeStandard: + return NewAgentPool(s, m) + case vmTypeVMSS: + return NewScaleSet(s, m) + default: + return nil, fmt.Errorf("vmtype %s not supported", m.config.VMType) + } } -func (m *AzureManager) nodeGroupRegisted(nodeGroup string) bool { - for _, ng := range m.nodeGroups { - if nodeGroup == ng.Id() { - return true - } +// Refresh is called before every main loop and can be used to dynamically update cloud provider state. +// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). +func (m *AzureManager) Refresh() error { + if m.lastRefresh.Add(refreshInterval).After(time.Now()) { + return nil } + return m.forceRefresh() +} - return false +func (m *AzureManager) forceRefresh() error { + if err := m.fetchAutoAsgs(); err != nil { + glog.Errorf("Failed to fetch ASGs: %v", err) + return err + } + m.lastRefresh = time.Now() + glog.V(2).Infof("Refreshed ASG list, next refresh after %v", m.lastRefresh.Add(refreshInterval)) + return nil } -// GetNodeGroupForInstance returns nodeGroup of the given Instance -func (m *AzureManager) GetNodeGroupForInstance(instance *AzureRef) (cloudprovider.NodeGroup, error) { - glog.V(5).Infof("Looking for node group for instance: %q", instance) +// Fetch automatically discovered ASGs. These ASGs should be unregistered if +// they no longer exist in Azure. +func (m *AzureManager) fetchAutoAsgs() error { + groups, err := m.getFilteredAutoscalingGroups(m.asgAutoDiscoverySpecs) + if err != nil { + return fmt.Errorf("cannot autodiscover ASGs: %s", err) + } - glog.V(8).Infof("Cache BEFORE: %v\n", m.nodeGroupsCache) + changed := false + exists := make(map[azureRef]bool) + for _, asg := range groups { + azRef := asg.getAzureRef() + exists[azRef] = true + if m.explicitlyConfigured[azRef] { + // This ASG was explicitly configured, but would also be + // autodiscovered. We want the explicitly configured min and max + // nodes to take precedence. + glog.V(3).Infof("Ignoring explicitly configured ASG %s for autodiscovery.", asg.Id()) + continue + } + if m.RegisterAsg(asg) { + glog.V(3).Infof("Autodiscovered ASG %s using tags %v", asg.Id(), m.asgAutoDiscoverySpecs) + changed = true + } + } - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() - if nodeGroup, found := m.nodeGroupsCache[*instance]; found { - return nodeGroup, nil + for _, asg := range m.getAsgs() { + azRef := asg.getAzureRef() + if !exists[azRef] && !m.explicitlyConfigured[azRef] { + m.UnregisterAsg(asg) + changed = true + } } - if err := m.regenerateCache(); err != nil { - return nil, fmt.Errorf("Error while looking for nodeGroup for instance %+v, error: %v", *instance, err) + if changed { + if err := m.regenerateCache(); err != nil { + return err + } } - glog.V(8).Infof("Cache AFTER: %v\n", m.nodeGroupsCache) + return nil +} - if nodeGroup, found := m.nodeGroupsCache[*instance]; found { - return nodeGroup, nil - } +func (m *AzureManager) getAsgs() []Asg { + return m.asgCache.get() +} - // instance does not belong to any configured nodeGroup. - return nil, nil +func (m *AzureManager) getInstanceIDs(instances []*azureRef) []string { + return m.asgCache.getInstanceIDs(instances) } -// GetInstanceIDs gets instanceIDs for specified instances. -func (m *AzureManager) GetInstanceIDs(instances []*AzureRef) []string { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() +// RegisterAsg registers an ASG. +func (m *AzureManager) RegisterAsg(asg Asg) bool { + return m.asgCache.Register(asg) +} - instanceIds := make([]string, len(instances)) - for i, instance := range instances { - instanceIds[i] = m.instanceIDsCache[instance.Name] - } +// UnregisterAsg unregisters an ASG. +func (m *AzureManager) UnregisterAsg(asg Asg) bool { + return m.asgCache.Unregister(asg) +} - return instanceIds +// GetAsgForInstance returns AsgConfig of the given Instance +func (m *AzureManager) GetAsgForInstance(instance *azureRef) (Asg, error) { + return m.asgCache.FindForInstance(instance) } -func (m *AzureManager) regenerateCache() (err error) { - var newCache map[AzureRef]cloudprovider.NodeGroup - var newInstanceIDsCache map[string]string +func (m *AzureManager) regenerateCache() error { + m.asgCache.mutex.Lock() + defer m.asgCache.mutex.Unlock() + return m.asgCache.regenerate() +} + +// Cleanup the ASG cache. +func (m *AzureManager) Cleanup() { + m.asgCache.Cleanup() +} +func (m *AzureManager) getFilteredAutoscalingGroups(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) { switch m.config.VMType { case vmTypeVMSS: - newCache, newInstanceIDsCache, err = m.listScaleSets() + asgs, err = m.listScaleSets(filter) case vmTypeStandard: - newCache, newInstanceIDsCache, err = m.listAgentPools() + asgs, err = m.listAgentPools(filter) default: err = fmt.Errorf("vmType %q not supported", m.config.VMType) } if err != nil { - return err + return nil, err } - m.nodeGroupsCache = newCache - m.instanceIDsCache = newInstanceIDsCache - return nil + return asgs, nil } -func (m *AzureManager) getNodeGroupByID(id string) cloudprovider.NodeGroup { - for _, ng := range m.nodeGroups { - if id == ng.Id() { - return ng - } +// listScaleSets gets a list of scale sets and instanceIDs. +func (m *AzureManager) listScaleSets(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) { + result, err := m.azClient.virtualMachineScaleSetsClient.List(m.config.ResourceGroup) + if err != nil { + glog.Errorf("VirtualMachineScaleSetsClient.List for %v failed: %v", m.config.ResourceGroup, err) + return nil, err } - return nil -} - -func (m *AzureManager) listAgentPools() (map[AzureRef]cloudprovider.NodeGroup, map[string]string, error) { - as := make(map[AzureRef]cloudprovider.NodeGroup) - instanceIDs := make(map[string]string) + moreResults := (result.Value != nil && len(*result.Value) > 0) + for moreResults { + for _, scaleSet := range *result.Value { + if len(filter) > 0 { + if scaleSet.Tags == nil || len(*scaleSet.Tags) == 0 { + continue + } - for _, nodeGroup := range m.nodeGroups { - agentPool, ok := nodeGroup.(*AgentPool) - if !ok { - return nil, nil, fmt.Errorf("node group %q is not AgentPool", nodeGroup) - } + if !matchDiscoveryConfig(*scaleSet.Tags, filter) { + continue + } + } - _, vmIndex, err := agentPool.GetVMIndexes() - if err != nil { - glog.Errorf("GetVMIndexes for node group %q failed: %v", nodeGroup.Id(), err) - return nil, nil, err + spec := &dynamic.NodeGroupSpec{ + Name: *scaleSet.Name, + MinSize: 1, + MaxSize: -1, + SupportScaleToZero: scaleToZeroSupported, + } + asg, _ := NewScaleSet(spec, m) + asgs = append(asgs, asg) } + moreResults = false - for idx := range vmIndex { - id := vmIndex[idx].ID - vmID := vmIndex[idx].VMID - - idRef := AzureRef{ - Name: id, - } - vmIDRef := AzureRef{ - Name: vmID, + if result.NextLink != nil { + result, err = m.azClient.virtualMachineScaleSetsClient.ListNextResults(result) + if err != nil { + glog.Errorf("VirtualMachineScaleSetsClient.ListNextResults for %v failed: %v", m.config.ResourceGroup, err) + return nil, err } - as[idRef] = nodeGroup - as[vmIDRef] = nodeGroup - instanceIDs[id] = fmt.Sprintf("%d", idx) - instanceIDs[vmID] = fmt.Sprintf("%d", idx) + + moreResults = (result.Value != nil && len(*result.Value) > 0) } + } - return as, instanceIDs, nil + return asgs, nil } -// listScaleSets gets a list of scale sets and instanceIDs. -func (m *AzureManager) listScaleSets() (map[AzureRef]cloudprovider.NodeGroup, map[string]string, error) { - var err error - scaleSets := make(map[AzureRef]cloudprovider.NodeGroup) - instanceIDs := make(map[string]string) - - for _, sset := range m.nodeGroups { - glog.V(4).Infof("Listing Scale Set information for %s", sset.Id()) - - resourceGroup := m.config.ResourceGroup - ssInfo, err := m.virtualMachineScaleSetsClient.Get(resourceGroup, sset.Id()) - if err != nil { - glog.Errorf("Failed to get scaleSet with name %s: %v", sset.Id(), err) - return nil, nil, err - } +// listAgentPools gets a list of agent pools and instanceIDs. +// Note: filter won't take effect for agent pools. +func (m *AzureManager) listAgentPools(filter []cloudprovider.LabelAutoDiscoveryConfig) (asgs []Asg, err error) { + deploy, err := m.azClient.deploymentsClient.Get(m.config.ResourceGroup, m.config.Deployment) + if err != nil { + glog.Errorf("deploymentsClient.Get(%s, %s) failed: %v", m.config.ResourceGroup, m.config.Deployment, err) + return nil, err + } - result, err := m.virtualMachineScaleSetVMsClient.List(resourceGroup, *ssInfo.Name, "", "", "") - if err != nil { - glog.Errorf("Failed to list vm for scaleSet %s: %v", *ssInfo.Name, err) - return nil, nil, err + for k := range *deploy.Properties.Parameters { + if k == "masterVMSize" || !strings.HasSuffix(k, "VMSize") { + continue } - moreResult := (result.Value != nil && len(*result.Value) > 0) - for moreResult { - for _, instance := range *result.Value { - // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). - name := "azure://" + strings.ToLower(*instance.ID) - vmID := "azure://" + strings.ToLower(*instance.VMID) - ref := AzureRef{ - Name: name, - } - vmIDRef := AzureRef{ - Name: vmID, - } - scaleSets[ref] = sset - scaleSets[vmIDRef] = sset - instanceIDs[name] = *instance.InstanceID - } - - moreResult = false - if result.NextLink != nil { - result, err = m.virtualMachineScaleSetVMsClient.ListNextResults(result) - if err != nil { - glog.Errorf("virtualMachineScaleSetVMsClient.ListNextResults failed: %v", err) - return nil, nil, err - } - - moreResult = (result.Value != nil && len(*result.Value) > 0) - } + poolName := strings.TrimRight(k, "VMSize") + spec := &dynamic.NodeGroupSpec{ + Name: poolName, + MinSize: 1, + MaxSize: -1, + SupportScaleToZero: scaleToZeroSupported, } + asg, _ := NewAgentPool(spec, m) + asgs = append(asgs, asg) } - return scaleSets, instanceIDs, err -} - -// Cleanup closes the channel to signal the go routine to stop that is handling the cache -func (m *AzureManager) Cleanup() { - close(m.interrupt) + return asgs, nil } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go index 608a2b59bd18..74e48cb67b0b 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set.go @@ -25,27 +25,28 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" ) // ScaleSet implements NodeGroup interface. type ScaleSet struct { - AzureRef - *AzureManager + azureRef + manager *AzureManager minSize int maxSize int } // NewScaleSet creates a new NewScaleSet. -func NewScaleSet(name string, minSize, maxSize int, az *AzureManager) (*ScaleSet, error) { +func NewScaleSet(spec *dynamic.NodeGroupSpec, az *AzureManager) (*ScaleSet, error) { scaleSet := &ScaleSet{ - AzureRef: AzureRef{ - Name: name, + azureRef: azureRef{ + Name: spec.Name, }, - minSize: minSize, - maxSize: maxSize, - AzureManager: az, + minSize: spec.MinSize, + maxSize: spec.MaxSize, + manager: az, } return scaleSet, nil @@ -86,8 +87,8 @@ func (scaleSet *ScaleSet) MaxSize() int { // GetScaleSetSize gets Scale Set size. func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { glog.V(5).Infof("Get scale set size for %q", scaleSet.Name) - resourceGroup := scaleSet.config.ResourceGroup - set, err := scaleSet.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name) + resourceGroup := scaleSet.manager.config.ResourceGroup + set, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name) if err != nil { return -1, err } @@ -97,8 +98,8 @@ func (scaleSet *ScaleSet) GetScaleSetSize() (int64, error) { // SetScaleSetSize sets ScaleSet size. func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { - resourceGroup := scaleSet.config.ResourceGroup - op, err := scaleSet.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name) + resourceGroup := scaleSet.manager.config.ResourceGroup + op, err := scaleSet.manager.azClient.virtualMachineScaleSetsClient.Get(resourceGroup, scaleSet.Name) if err != nil { return err } @@ -107,7 +108,7 @@ func (scaleSet *ScaleSet) SetScaleSetSize(size int64) error { op.VirtualMachineScaleSetProperties.ProvisioningState = nil cancel := make(chan struct{}) - _, errChan := scaleSet.virtualMachineScaleSetsClient.CreateOrUpdate(resourceGroup, scaleSet.Name, op, cancel) + _, errChan := scaleSet.manager.azClient.virtualMachineScaleSetsClient.CreateOrUpdate(resourceGroup, scaleSet.Name, op, cancel) return <-errChan } @@ -137,23 +138,28 @@ func (scaleSet *ScaleSet) IncreaseSize(delta int) error { } // GetScaleSetVms returns list of nodes for the given scale set. -func (scaleSet *ScaleSet) GetScaleSetVms() ([]string, error) { - resourceGroup := scaleSet.config.ResourceGroup - instances, err := scaleSet.virtualMachineScaleSetVMsClient.List(resourceGroup, scaleSet.Name, "", "", "") - +func (scaleSet *ScaleSet) GetScaleSetVms() ([]compute.VirtualMachineScaleSetVM, error) { + allVMs := make([]compute.VirtualMachineScaleSetVM, 0) + resourceGroup := scaleSet.manager.config.ResourceGroup + result, err := scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.List(resourceGroup, scaleSet.Name, "", "", "") if err != nil { glog.V(4).Infof("VirtualMachineScaleSetVMsClient.List failed for %s: %v", scaleSet.Name, err) - return []string{}, err + return nil, err } - result := make([]string, 0) - for _, instance := range *instances.Value { - // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). - name := "azure://" + strings.ToLower(*instance.ID) - result = append(result, name) + moreResults := (result.Value != nil && len(*result.Value) > 0) + for moreResults { + allVMs = append(allVMs, *result.Value...) + moreResults = false + + result, err = scaleSet.manager.azClient.virtualMachineScaleSetVMsClient.ListNextResults(result) + if err != nil { + return nil, err + } + moreResults = (result.Value != nil && len(*result.Value) > 0) } - return result, nil + return allVMs, nil } // DecreaseTargetSize decreases the target size of the node group. This function @@ -171,7 +177,7 @@ func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error { return err } - nodes, err := scaleSet.GetScaleSetVms() + nodes, err := scaleSet.Nodes() if err != nil { return err } @@ -188,11 +194,11 @@ func (scaleSet *ScaleSet) DecreaseTargetSize(delta int) error { func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) { glog.V(6).Infof("Check if node belongs to this scale set: scaleset:%v, node:%v\n", scaleSet, node) - ref := &AzureRef{ + ref := &azureRef{ Name: strings.ToLower(node.Spec.ProviderID), } - targetAsg, err := scaleSet.GetNodeGroupForInstance(ref) + targetAsg, err := scaleSet.manager.GetAsgForInstance(ref) if err != nil { return false, err } @@ -206,18 +212,18 @@ func (scaleSet *ScaleSet) Belongs(node *apiv1.Node) (bool, error) { } // DeleteInstances deletes the given instances. All instances must be controlled by the same ASG. -func (scaleSet *ScaleSet) DeleteInstances(instances []*AzureRef) error { +func (scaleSet *ScaleSet) DeleteInstances(instances []*azureRef) error { if len(instances) == 0 { return nil } - commonAsg, err := scaleSet.GetNodeGroupForInstance(instances[0]) + commonAsg, err := scaleSet.manager.GetAsgForInstance(instances[0]) if err != nil { return err } for _, instance := range instances { - asg, err := scaleSet.GetNodeGroupForInstance(instance) + asg, err := scaleSet.manager.GetAsgForInstance(instance) if err != nil { return err } @@ -227,13 +233,13 @@ func (scaleSet *ScaleSet) DeleteInstances(instances []*AzureRef) error { } } - instanceIds := scaleSet.GetInstanceIDs(instances) + instanceIds := scaleSet.manager.getInstanceIDs(instances) requiredIds := &compute.VirtualMachineScaleSetVMInstanceRequiredIDs{ InstanceIds: &instanceIds, } cancel := make(chan struct{}) - resourceGroup := scaleSet.config.ResourceGroup - _, errChan := scaleSet.virtualMachineScaleSetsClient.DeleteInstances(resourceGroup, commonAsg.Id(), *requiredIds, cancel) + resourceGroup := scaleSet.manager.config.ResourceGroup + _, errChan := scaleSet.manager.azClient.virtualMachineScaleSetsClient.DeleteInstances(resourceGroup, commonAsg.Id(), *requiredIds, cancel) return <-errChan } @@ -249,7 +255,7 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { return fmt.Errorf("min size reached, nodes will not be deleted") } - refs := make([]*AzureRef, 0, len(nodes)) + refs := make([]*azureRef, 0, len(nodes)) for _, node := range nodes { belongs, err := scaleSet.Belongs(node) if err != nil { @@ -260,10 +266,10 @@ func (scaleSet *ScaleSet) DeleteNodes(nodes []*apiv1.Node) error { return fmt.Errorf("%s belongs to a different asg than %s", node.Name, scaleSet.Id()) } - azureRef := &AzureRef{ + ref := &azureRef{ Name: strings.ToLower(node.Spec.ProviderID), } - refs = append(refs, azureRef) + refs = append(refs, ref) } return scaleSet.DeleteInstances(refs) @@ -286,5 +292,40 @@ func (scaleSet *ScaleSet) TemplateNodeInfo() (*schedulercache.NodeInfo, error) { // Nodes returns a list of all nodes that belong to this node group. func (scaleSet *ScaleSet) Nodes() ([]string, error) { - return scaleSet.GetScaleSetVms() + vms, err := scaleSet.GetScaleSetVms() + if err != nil { + return nil, err + } + + result := make([]string, len(vms)) + for i := range vms { + // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). + name := "azure://" + strings.ToLower(*vms[i].ID) + result = append(result, name) + } + + return result, nil +} + +func (scaleSet *ScaleSet) getInstanceIDs() (map[azureRef]string, error) { + vms, err := scaleSet.GetScaleSetVms() + if err != nil { + return nil, err + } + + result := make(map[azureRef]string) + for i := range vms { + // Convert to lower because instance.ID is in different in different API calls (e.g. GET and LIST). + ref := azureRef{ + Name: "azure://" + strings.ToLower(*vms[i].ID), + } + result[ref] = *vms[i].InstanceID + } + + return result, nil +} + +// GetAzureRef gets AzureRef fot the scale set. +func (scaleSet *ScaleSet) getAzureRef() azureRef { + return scaleSet.azureRef } diff --git a/cluster-autoscaler/cloudprovider/azure/azure_util.go b/cluster-autoscaler/cloudprovider/azure/azure_util.go index 623082798344..963395c890ed 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_util.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_util.go @@ -30,6 +30,7 @@ import ( "github.com/Azure/go-autorest/autorest" "github.com/golang/glog" "golang.org/x/crypto/pkcs12" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/client-go/pkg/version" ) @@ -347,3 +348,64 @@ func GetVMNameIndex(osType compute.OperatingSystemTypes, vmName string) (int, er return agentIndex, nil } + +func matchDiscoveryConfig(labels map[string]*string, configs []cloudprovider.LabelAutoDiscoveryConfig) bool { + for _, c := range configs { + for k, v := range c.Selector { + value, ok := labels[k] + if !ok { + return false + } + + if len(v) > 0 { + if value == nil || *value != v { + return false + } + } + } + } + + return true +} + +func validateConfig(cfg *Config) error { + if cfg.ResourceGroup == "" { + return fmt.Errorf("resource group not set") + } + + if cfg.SubscriptionID == "" { + return fmt.Errorf("subscription ID not set") + } + + if cfg.TenantID == "" { + return fmt.Errorf("tenant ID not set") + } + + if cfg.AADClientID == "" { + return fmt.Errorf("ARM Client ID not set") + } + + if cfg.VMType == vmTypeStandard { + if cfg.Deployment == "" { + return fmt.Errorf("deployment not set") + } + + if cfg.APIServerPrivateKey == "" { + return fmt.Errorf("apiServerPrivateKey not set") + } + + if cfg.CAPrivateKey == "" { + return fmt.Errorf("caPrivateKey not set") + } + + if cfg.ClientPrivateKey == "" { + return fmt.Errorf("clientPrivateKey not set") + } + + if cfg.KubeConfigPrivateKey == "" { + return fmt.Errorf("kubeConfigPrivateKey not set") + } + } + + return nil +} diff --git a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go index 8cf8ca8ab3b5..f93f27abfbd0 100644 --- a/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go +++ b/cluster-autoscaler/cloudprovider/builder/cloud_provider_builder.go @@ -155,11 +155,11 @@ func (b CloudProviderBuilder) buildAzure(do cloudprovider.NodeGroupDiscoveryOpti } else { glog.Info("Creating Azure Manager with default configuration.") } - manager, err := azure.CreateAzureManager(config) + manager, err := azure.CreateAzureManager(config, do) if err != nil { glog.Fatalf("Failed to create Azure Manager: %v", err) } - provider, err := azure.BuildAzureCloudProvider(manager, do.NodeGroupSpecs, rl) + provider, err := azure.BuildAzureCloudProvider(manager, rl) if err != nil { glog.Fatalf("Failed to create Azure cloud provider: %v", err) } diff --git a/cluster-autoscaler/cloudprovider/node_group_discovery_options.go b/cluster-autoscaler/cloudprovider/node_group_discovery_options.go index 6420fe9df8c6..31d2c2e21659 100644 --- a/cluster-autoscaler/cloudprovider/node_group_discovery_options.go +++ b/cluster-autoscaler/cloudprovider/node_group_discovery_options.go @@ -25,8 +25,9 @@ import ( ) const ( - autoDiscovererTypeMIG = "mig" - autoDiscovererTypeASG = "asg" + autoDiscovererTypeMIG = "mig" + autoDiscovererTypeASG = "asg" + autoDiscovererTypeLabel = "label" migAutoDiscovererKeyPrefix = "namePrefix" migAutoDiscovererKeyMinNodes = "min" @@ -93,6 +94,20 @@ func (o NodeGroupDiscoveryOptions) ParseASGAutoDiscoverySpecs() ([]ASGAutoDiscov return cfgs, nil } +// ParseLabelAutoDiscoverySpecs returns any provided NodeGroupAutoDiscoverySpecs +// parsed into configuration appropriate for ASG autodiscovery. +func (o NodeGroupDiscoveryOptions) ParseLabelAutoDiscoverySpecs() ([]LabelAutoDiscoveryConfig, error) { + cfgs := make([]LabelAutoDiscoveryConfig, len(o.NodeGroupAutoDiscoverySpecs)) + var err error + for i, spec := range o.NodeGroupAutoDiscoverySpecs { + cfgs[i], err = parseLabelAutoDiscoverySpec(spec) + if err != nil { + return nil, err + } + } + return cfgs, nil +} + // A MIGAutoDiscoveryConfig specifies how to autodiscover GCE MIGs. type MIGAutoDiscoveryConfig struct { // Re is a regexp passed using the eq filter to the GCE list API. @@ -155,7 +170,7 @@ func parseMIGAutoDiscoverySpec(spec string) (MIGAutoDiscoveryConfig, error) { // An ASGAutoDiscoveryConfig specifies how to autodiscover AWS ASGs. type ASGAutoDiscoveryConfig struct { // TagKeys to match on. - // Any ASG with all of the provided tag keys will be autoscaled. + // Any ASG with all of the provided tag keys wMIGAutoDiscoveryConfigill be autoscaled. TagKeys []string } @@ -188,3 +203,36 @@ func parseASGAutoDiscoverySpec(spec string) (ASGAutoDiscoveryConfig, error) { } return cfg, nil } + +// A LabelAutoDiscoveryConfig specifies how to autodiscover Azure scale sets. +type LabelAutoDiscoveryConfig struct { + // Key-values to match on. + Selector map[string]string +} + +func parseLabelAutoDiscoverySpec(spec string) (LabelAutoDiscoveryConfig, error) { + cfg := LabelAutoDiscoveryConfig{ + Selector: make(map[string]string), + } + + tokens := strings.Split(spec, ":") + if len(tokens) != 2 { + return cfg, fmt.Errorf("spec \"%s\" should be discoverer:key=value,key=value", spec) + } + discoverer := tokens[0] + if discoverer != autoDiscovererTypeLabel { + return cfg, fmt.Errorf("unsupported discoverer specified: %s", discoverer) + } + + for _, arg := range strings.Split(tokens[1], ",") { + kv := strings.Split(arg, "=") + if len(kv) != 2 { + return cfg, fmt.Errorf("invalid key=value pair %s", kv) + } + + k, v := kv[0], kv[1] + cfg.Selector[k] = v + } + + return cfg, nil +} From e737cbb0f9b1299169aeef31c765fc7e4db3586d Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 15 Jan 2018 17:18:43 +0800 Subject: [PATCH 5/5] Add unit tests --- .../azure/azure_cloud_provider_test.go | 130 +++++++----------- .../azure/azure_scale_set_test.go | 115 ++++++++-------- 2 files changed, 112 insertions(+), 133 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go index 31700b62e541..b0f3a1822513 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_cloud_provider_test.go @@ -19,6 +19,8 @@ package azure import ( "testing" + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/resources/resources" "github.com/Azure/go-autorest/autorest/azure" "github.com/stretchr/testify/assert" @@ -26,85 +28,84 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ) -func newTestAzureManager() *AzureManager { - return &AzureManager{ - config: &Config{VMType: vmTypeVMSS}, - env: azure.PublicCloud, - interrupt: make(chan struct{}), - instanceIDsCache: make(map[string]string), - nodeGroups: make([]cloudprovider.NodeGroup, 0), - nodeGroupsCache: make(map[AzureRef]cloudprovider.NodeGroup), - - disksClient: &DisksClientMock{}, - interfacesClient: &InterfacesClientMock{}, - storageAccountsClient: &AccountsClientMock{}, - deploymentsClient: &DeploymentsClientMock{}, - virtualMachinesClient: &VirtualMachinesClientMock{}, - virtualMachineScaleSetsClient: &VirtualMachineScaleSetsClientMock{}, - virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{}, +func newTestAzureManager(t *testing.T) *AzureManager { + manager := &AzureManager{ + env: azure.PublicCloud, + explicitlyConfigured: make(map[azureRef]bool), + config: &Config{ + ResourceGroup: "test", + VMType: vmTypeVMSS, + }, + + azClient: &azClient{ + disksClient: &DisksClientMock{}, + interfacesClient: &InterfacesClientMock{}, + storageAccountsClient: &AccountsClientMock{}, + deploymentsClient: &DeploymentsClientMock{ + FakeStore: make(map[string]resources.DeploymentExtended), + }, + virtualMachinesClient: &VirtualMachinesClientMock{ + FakeStore: make(map[string]map[string]compute.VirtualMachine), + }, + virtualMachineScaleSetsClient: &VirtualMachineScaleSetsClientMock{ + FakeStore: make(map[string]map[string]compute.VirtualMachineScaleSet), + }, + virtualMachineScaleSetVMsClient: &VirtualMachineScaleSetVMsClientMock{}, + }, } + cache, error := newAsgCache() + assert.NoError(t, error) + + manager.asgCache = cache + return manager } -func newTestProvider() (*AzureCloudProvider, error) { - manager := newTestAzureManager() +func newTestProvider(t *testing.T) *AzureCloudProvider { + manager := newTestAzureManager(t) resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - return BuildAzureCloudProvider(manager, nil, resourceLimiter) + + return &AzureCloudProvider{ + azureManager: manager, + resourceLimiter: resourceLimiter, + } } func TestBuildAzureCloudProvider(t *testing.T) { resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - m := newTestAzureManager() - _, err := BuildAzureCloudProvider(m, []string{"bad spec"}, resourceLimiter) - assert.Error(t, err) - - _, err = BuildAzureCloudProvider(m, nil, resourceLimiter) + m := newTestAzureManager(t) + _, err := BuildAzureCloudProvider(m, resourceLimiter) assert.NoError(t, err) } -func TestAddNodeGroup(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - - err = provider.addNodeGroup("bad spec") - assert.Error(t, err) - assert.Equal(t, len(provider.nodeGroups), 0) - - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.nodeGroups), 1) -} - func TestName(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) + provider := newTestProvider(t) assert.Equal(t, provider.Name(), "azure") } func TestNodeGroups(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) + provider := newTestProvider(t) assert.Equal(t, len(provider.NodeGroups()), 0) - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) assert.Equal(t, len(provider.NodeGroups()), 1) } func TestNodeGroupForNode(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.nodeGroups), 1) + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) node := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure://123E4567-E89B-12D3-A456-426655440000", + ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID, }, } group, err := provider.NodeGroupForNode(node) @@ -124,32 +125,3 @@ func TestNodeGroupForNode(t *testing.T) { assert.NoError(t, err) assert.Nil(t, group) } - -func TestBuildNodeGroup(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - - _, err = provider.buildNodeGroup("a") - assert.Error(t, err) - _, err = provider.buildNodeGroup("a:b:c") - assert.Error(t, err) - _, err = provider.buildNodeGroup("1:") - assert.Error(t, err) - _, err = provider.buildNodeGroup("1:2:") - assert.Error(t, err) - - _, err = provider.buildNodeGroup("-1:2:") - assert.Error(t, err) - - _, err = provider.buildNodeGroup("5:3:") - assert.Error(t, err) - - _, err = provider.buildNodeGroup("5:ddd:test-name") - assert.Error(t, err) - - asg, err := provider.buildNodeGroup("111:222:test-name") - assert.NoError(t, err) - assert.Equal(t, 111, asg.MinSize()) - assert.Equal(t, 222, asg.MaxSize()) - assert.Equal(t, "test-name", asg.Id()) -} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go index c7c21034d6e5..9286f8dbbf57 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_scale_set_test.go @@ -28,105 +28,112 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" ) -func TestMaxSize(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) +func newTestScaleSet(manager *AzureManager, name string) *ScaleSet { + return &ScaleSet{ + azureRef: azureRef{ + Name: name, + }, + manager: manager, + minSize: 1, + maxSize: 5, + } +} - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.nodeGroups), 1) - assert.Equal(t, provider.nodeGroups[0].MaxSize(), 5) +func TestMaxSize(t *testing.T) { + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) + assert.Equal(t, provider.NodeGroups()[0].MaxSize(), 5) } func TestMinSize(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.nodeGroups), 1) - assert.Equal(t, provider.nodeGroups[0].MinSize(), 1) + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) + assert.Equal(t, provider.NodeGroups()[0].MinSize(), 1) } func TestTargetSize(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) - err = provider.addNodeGroup("1:5:test-asg") + targetSize, err := provider.NodeGroups()[0].TargetSize() assert.NoError(t, err) - targetSize, err := provider.nodeGroups[0].TargetSize() assert.Equal(t, targetSize, 2) - assert.NoError(t, err) } func TestIncreaseSize(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.nodeGroups), 1) + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) - err = provider.nodeGroups[0].IncreaseSize(1) + err := provider.NodeGroups()[0].IncreaseSize(1) assert.NoError(t, err) } func TestBelongs(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) - scaleSet, ok := provider.nodeGroups[0].(*ScaleSet) + scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) assert.True(t, ok) invalidNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure:///subscriptions/subscriptionId/resourceGroups/kubernetes/providers/Microsoft.Compute/virtualMachines/invalid-instance-id", + ProviderID: "azure:///subscriptions/test-subscrition-id/resourceGroups/invalid-asg/providers/Microsoft.Compute/virtualMachineScaleSets/agents/virtualMachines/0", }, } - _, err = scaleSet.Belongs(invalidNode) + _, err := scaleSet.Belongs(invalidNode) assert.Error(t, err) validNode := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure://123E4567-E89B-12D3-A456-426655440000", + ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID, }, } - belongs, err := scaleSet.Belongs(validNode) assert.Equal(t, true, belongs) assert.NoError(t, err) } func TestDeleteNodes(t *testing.T) { - manager := newTestAzureManager() + manager := newTestAzureManager(t) scaleSetClient := &VirtualMachineScaleSetsClientMock{} - instanceIds := make([]string, 1) - instanceIds[0] = "test-instance-id" response := autorest.Response{ Response: &http.Response{ Status: "OK", }, } scaleSetClient.On("DeleteInstances", mock.Anything, "test-asg", mock.Anything, mock.Anything).Return(response, nil) - manager.virtualMachineScaleSetsClient = scaleSetClient + manager.azClient.virtualMachineScaleSetsClient = scaleSetClient resourceLimiter := cloudprovider.NewResourceLimiter( map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}) - provider, err := BuildAzureCloudProvider(manager, nil, resourceLimiter) - assert.NoError(t, err) - err = provider.addNodeGroup("1:5:test-asg") + provider, err := BuildAzureCloudProvider(manager, resourceLimiter) assert.NoError(t, err) + registered := manager.RegisterAsg( + newTestScaleSet(manager, "test-asg")) + assert.True(t, registered) + node := &apiv1.Node{ Spec: apiv1.NodeSpec{ - ProviderID: "azure://123E4567-E89B-12D3-A456-426655440000", + ProviderID: "azure://" + fakeVirtualMachineScaleSetVMID, }, } - scaleSet, ok := provider.nodeGroups[0].(*ScaleSet) + scaleSet, ok := provider.NodeGroups()[0].(*ScaleSet) assert.True(t, ok) err = scaleSet.DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) @@ -134,19 +141,19 @@ func TestDeleteNodes(t *testing.T) { } func TestId(t *testing.T) { - provider, err := newTestProvider() - assert.NoError(t, err) - err = provider.addNodeGroup("1:5:test-asg") - assert.NoError(t, err) - assert.Equal(t, len(provider.nodeGroups), 1) - assert.Equal(t, provider.nodeGroups[0].Id(), "test-asg") + provider := newTestProvider(t) + registered := provider.azureManager.RegisterAsg( + newTestScaleSet(provider.azureManager, "test-asg")) + assert.True(t, registered) + assert.Equal(t, len(provider.NodeGroups()), 1) + assert.Equal(t, provider.NodeGroups()[0].Id(), "test-asg") } func TestDebug(t *testing.T) { asg := ScaleSet{ - AzureManager: newTestAzureManager(), - minSize: 5, - maxSize: 55, + manager: newTestAzureManager(t), + minSize: 5, + maxSize: 55, } asg.Name = "test-scale-set" assert.Equal(t, asg.Debug(), "test-scale-set (5:55)")