Skip to content

Commit 543e339

Browse files
committed
chore: rework Scaleway cloudprovider integration
1 parent 503bdff commit 543e339

File tree

6 files changed

+2272
-652
lines changed

6 files changed

+2272
-652
lines changed

cluster-autoscaler/cloudprovider/scaleway/scaleway_cloud_provider.go

Lines changed: 134 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"encoding/json"
2222
"fmt"
2323
"io"
24-
"io/ioutil"
2524
"math"
2625
"os"
2726
"time"
@@ -38,7 +37,10 @@ import (
3837

3938
const (
4039
// GPULabel is the label added to GPU nodes
41-
GPULabel = "k8s.scaleway.com/gpu"
40+
GPULabel = "k8s.scw.cloud/gpu"
41+
42+
// DefaultRefreshInterval is the default refresh interval for the cloud provider
43+
DefaultRefreshInterval = 60 * time.Second
4244
)
4345

4446
type scalewayCloudProvider struct {
@@ -47,21 +49,30 @@ type scalewayCloudProvider struct {
4749
// ClusterID is the cluster id where the Autoscaler is running.
4850
clusterID string
4951
// nodeGroups is an abstraction around the Pool object returned by the API
50-
nodeGroups []*NodeGroup
52+
// key is the Pool ID
53+
nodeGroups map[string]*NodeGroup
54+
// providerNodeGroups is a pre-converted slice of node groups for NodeGroups() method
55+
providerNodeGroups []cloudprovider.NodeGroup
56+
// refreshInterval is the minimum duration between refreshes
57+
refreshInterval time.Duration
58+
// lastRefresh is the last time the nodes and node groups were refreshed from the API
59+
lastRefresh time.Time
60+
// lastRefreshError stores the error from the last refresh, if any
61+
lastRefreshError error
5162

5263
resourceLimiter *cloudprovider.ResourceLimiter
5364
}
5465

5566
func readConf(config *scalewaygo.Config, configFile io.Reader) error {
56-
body, err := ioutil.ReadAll(configFile)
67+
body, err := io.ReadAll(configFile)
5768
if err != nil {
5869
return err
5970
}
6071
err = json.Unmarshal(body, config)
6172
return err
6273
}
6374

64-
func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) *scalewayCloudProvider {
75+
func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl *cloudprovider.ResourceLimiter) (*scalewayCloudProvider, error) {
6576
getenvOr := func(key, defaultValue string) string {
6677
value := os.Getenv(key)
6778
if value != "" {
@@ -84,6 +95,7 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl
8495
cfg.SecretKey = getenvOr("SCW_SECRET_KEY", cfg.SecretKey)
8596
cfg.Region = getenvOr("SCW_REGION", cfg.Region)
8697
cfg.ApiUrl = getenvOr("SCW_API_URL", cfg.ApiUrl)
98+
cfg.DefaultCacheControl = DefaultRefreshInterval
8799

88100
cfg.UserAgent = defaultUserAgent
89101

@@ -92,21 +104,26 @@ func newScalewayCloudProvider(configFile io.Reader, defaultUserAgent string, rl
92104
klog.Fatalf("failed to create scaleway cloud provider: %v", err)
93105
}
94106

95-
klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,SecretKey=%s-***,Region=%s,ApiURL=%s", cfg.ClusterID, client.Token()[:8], client.Region(), client.ApiURL())
107+
klog.V(4).Infof("Scaleway Cloud Provider built; ClusterId=%s,Region=%s,ApiURL=%s", cfg.ClusterID, client.Region(), client.ApiURL())
96108

97-
return &scalewayCloudProvider{
109+
provider := &scalewayCloudProvider{
98110
client: client,
99111
clusterID: cfg.ClusterID,
100112
resourceLimiter: rl,
113+
refreshInterval: DefaultRefreshInterval,
114+
}
115+
116+
// Perform initial refresh to populate node groups cache
117+
if err := provider.Refresh(); err != nil {
118+
klog.Errorf("Failed to perform initial refresh: %v", err)
119+
return nil, err
101120
}
121+
122+
return provider, nil
102123
}
103124

104125
// BuildScaleway returns CloudProvider implementation for Scaleway.
105-
func BuildScaleway(
106-
opts *coreoptions.AutoscalerOptions,
107-
do cloudprovider.NodeGroupDiscoveryOptions,
108-
rl *cloudprovider.ResourceLimiter,
109-
) cloudprovider.CloudProvider {
126+
func BuildScaleway(opts *coreoptions.AutoscalerOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider {
110127
var configFile io.Reader
111128

112129
if opts.CloudConfig != "" {
@@ -123,79 +140,63 @@ func BuildScaleway(
123140
}()
124141
}
125142
}
126-
return newScalewayCloudProvider(configFile, opts.UserAgent, rl)
143+
144+
provider, err := newScalewayCloudProvider(configFile, opts.UserAgent, rl)
145+
if err != nil {
146+
klog.Fatalf("Failed to create Scaleway cloud provider: %v", err)
147+
}
148+
return provider
127149
}
128150

129-
// Name returns 'scaleway'
151+
// Name returns name of the cloud provider.
130152
func (*scalewayCloudProvider) Name() string {
131153
return cloudprovider.ScalewayProviderName
132154
}
133155

134-
// NodeGroups returns all node groups configured for this cluster.
135-
// critical endpoint, make it fast
156+
// NodeGroups returns all node groups configured for this cloud provider.
136157
func (scw *scalewayCloudProvider) NodeGroups() []cloudprovider.NodeGroup {
137-
138158
klog.V(4).Info("NodeGroups,ClusterID=", scw.clusterID)
139159

140-
nodeGroups := make([]cloudprovider.NodeGroup, len(scw.nodeGroups))
141-
for i, ng := range scw.nodeGroups {
142-
nodeGroups[i] = ng
143-
}
144-
return nodeGroups
145-
}
146-
147-
func (scw *scalewayCloudProvider) nodeGroupForNode(node *apiv1.Node) (*NodeGroup, error) {
148-
for _, ng := range scw.nodeGroups {
149-
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
150-
return ng, nil
151-
}
152-
}
153-
return nil, nil
160+
return scw.providerNodeGroups
154161
}
155162

156163
// NodeGroupForNode returns the node group for the given node, nil if the node
157164
// should not be processed by cluster autoscaler, or non-nil error if such
158-
// occurred.
159-
// critical endpoint, make it fast
165+
// occurred. Must be implemented.
160166
func (scw *scalewayCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.NodeGroup, error) {
161167
klog.V(4).Infof("NodeGroupForNode,NodeSpecProviderID=%s", node.Spec.ProviderID)
162168

163-
return scw.nodeGroupForNode(node)
164-
}
165-
166-
// HasInstance returns whether a given node has a corresponding instance in this cloud provider
167-
func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
168-
return true, cloudprovider.ErrNotImplemented
169-
}
170-
171-
func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
172-
ng, err := scw.nodeGroupForNode(node)
173-
if err != nil {
174-
return 0.0, err
169+
for _, ng := range scw.nodeGroups {
170+
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
171+
return ng, nil
172+
}
175173
}
176174

177-
d := endTime.Sub(startTime)
178-
hours := math.Ceil(d.Hours())
179-
180-
return hours * float64(ng.specs.NodePricePerHour), nil
175+
return nil, nil
181176
}
182177

183-
func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) {
184-
return 0.0, nil
178+
// HasInstance returns whether the node has corresponding instance in cloud provider,
179+
// true if the node has an instance, false if it no longer exists
180+
func (scw *scalewayCloudProvider) HasInstance(node *apiv1.Node) (bool, error) {
181+
return node.Spec.ProviderID != "", nil
185182
}
186183

187-
// Pricing return pricing model for scaleway.
184+
// Pricing returns pricing model for this cloud provider or error if not available.
185+
// Implementation optional.
188186
func (scw *scalewayCloudProvider) Pricing() (cloudprovider.PricingModel, ca_errors.AutoscalerError) {
189187
klog.V(4).Info("Pricing,called")
190188
return scw, nil
191189
}
192190

193-
// GetAvailableMachineTypes get all machine types that can be requested from scaleway.
194-
// Not implemented
191+
// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider.
192+
// Implementation optional.
195193
func (scw *scalewayCloudProvider) GetAvailableMachineTypes() ([]string, error) {
196194
return []string{}, nil
197195
}
198196

197+
// NewNodeGroup builds a theoretical node group based on the node definition provided. The node group is not automatically
198+
// created on the cloud provider side. The node group is not returned by NodeGroups() until it is created.
199+
// Implementation optional.
199200
func (scw *scalewayCloudProvider) NewNodeGroup(
200201
machineType string,
201202
labels map[string]string,
@@ -220,7 +221,6 @@ func (scw *scalewayCloudProvider) GPULabel() string {
220221
}
221222

222223
// GetAvailableGPUTypes return all available GPU types cloud provider supports.
223-
// not yet implemented.
224224
func (scw *scalewayCloudProvider) GetAvailableGPUTypes() map[string]struct{} {
225225
klog.V(4).Info("GetAvailableGPUTypes,called")
226226
return nil
@@ -244,36 +244,98 @@ func (scw *scalewayCloudProvider) Cleanup() error {
244244
func (scw *scalewayCloudProvider) Refresh() error {
245245
klog.V(4).Info("Refresh,ClusterID=", scw.clusterID)
246246

247-
ctx := context.Background()
248-
resp, err := scw.client.ListPools(ctx, &scalewaygo.ListPoolsRequest{ClusterID: scw.clusterID})
247+
// Only skip refresh if lastRefresh is non-zero and interval has not elapsed
248+
if !scw.lastRefresh.IsZero() && time.Since(scw.lastRefresh) < scw.refreshInterval {
249+
klog.V(4).Infof("Refresh,ClusterID=%s,skipping refresh, last refresh was %s ago", scw.clusterID, time.Since(scw.lastRefresh))
250+
return scw.lastRefreshError
251+
}
249252

253+
cc, pools, err := scw.client.ListPools(context.Background(), scw.clusterID)
250254
if err != nil {
251255
klog.Errorf("Refresh,failed to list pools for cluster %s: %s", scw.clusterID, err)
256+
scw.lastRefresh = time.Now()
257+
scw.lastRefreshError = err
252258
return err
253259
}
260+
// Update refresh interval based on Cache-Control header from listPools response
261+
scw.refreshInterval = cc
254262

255-
var ng []*NodeGroup
256-
257-
for _, p := range resp.Pools {
263+
_, nodes, err := scw.client.ListNodes(context.Background(), scw.clusterID)
264+
if err != nil {
265+
klog.Errorf("Refresh,failed to list nodes for cluster %s: %s", scw.clusterID, err)
266+
scw.lastRefresh = time.Now()
267+
scw.lastRefreshError = err
268+
return err
269+
}
258270

259-
if p.Pool.Autoscaling == false {
271+
// Build NodeGroups
272+
nodeGroups := make(map[string]*NodeGroup)
273+
for _, pool := range pools {
274+
if !pool.Autoscaling {
275+
klog.V(4).Infof("Refresh,ClusterID=%s,skipping pool %s (autoscaling disabled)", scw.clusterID, pool.ID)
260276
continue
261277
}
262278

263-
nodes, err := nodesFromPool(scw.client, p.Pool)
264-
if err != nil {
265-
return fmt.Errorf("Refresh,failed to list nodes for pool %s: %w", p.Pool.ID, err)
266-
}
267-
ng = append(ng, &NodeGroup{
279+
nodeGroup := &NodeGroup{
268280
Client: scw.client,
269-
nodes: nodes,
270-
specs: &p.Specs,
271-
p: p.Pool,
272-
})
281+
nodes: make(map[string]*scalewaygo.Node),
282+
pool: pool,
283+
}
284+
285+
nodeGroups[pool.ID] = nodeGroup
273286
}
274-
klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(ng))
275287

276-
scw.nodeGroups = ng
288+
// Assign nodes to NodeGroups
289+
for _, node := range nodes {
290+
_, ok := nodeGroups[node.PoolID]
291+
if !ok {
292+
klog.V(4).Infof("Refresh,ClusterID=%s,node %s found for PoolID=%s which does not exist in nodeGroups, skipping", scw.clusterID, node.ProviderID, node.PoolID)
293+
continue
294+
}
295+
296+
nodeGroups[node.PoolID].nodes[node.ProviderID] = &node
297+
}
298+
299+
scw.nodeGroups = nodeGroups
300+
301+
// Pre-convert nodeGroups map to slice for NodeGroups() method
302+
// This is to avoid converting the map to a slice on every call to NodeGroups()
303+
// which happens quite often
304+
scw.providerNodeGroups = make([]cloudprovider.NodeGroup, 0, len(nodeGroups))
305+
for _, ng := range nodeGroups {
306+
scw.providerNodeGroups = append(scw.providerNodeGroups, ng)
307+
}
308+
309+
klog.V(4).Infof("Refresh,ClusterID=%s,%d pools found", scw.clusterID, len(nodeGroups))
310+
311+
scw.lastRefresh = time.Now()
312+
scw.lastRefreshError = nil
277313

278314
return nil
279315
}
316+
317+
// NodePrice returns a price of running the given node for a given period of time.
318+
// All prices returned by the structure should be in the same currency.
319+
func (scw *scalewayCloudProvider) NodePrice(node *apiv1.Node, startTime time.Time, endTime time.Time) (float64, error) {
320+
var nodeGroup *NodeGroup
321+
for _, ng := range scw.nodeGroups {
322+
if _, ok := ng.nodes[node.Spec.ProviderID]; ok {
323+
nodeGroup = ng
324+
}
325+
}
326+
327+
if nodeGroup == nil {
328+
return 0.0, fmt.Errorf("node group not found for node %s", node.Spec.ProviderID)
329+
}
330+
331+
d := endTime.Sub(startTime)
332+
hours := math.Ceil(d.Hours())
333+
334+
return hours * float64(nodeGroup.pool.NodePricePerHour), nil
335+
}
336+
337+
// PodPrice returns a theoretical minimum price of running a pod for a given
338+
// period of time on a perfectly matching machine.
339+
func (scw *scalewayCloudProvider) PodPrice(pod *apiv1.Pod, startTime time.Time, endTime time.Time) (float64, error) {
340+
return 0.0, nil
341+
}

0 commit comments

Comments
 (0)