Skip to content

Commit

Permalink
Improve hpa creation (#1793)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirdavid1 authored Nov 20, 2024
1 parent 733b4a8 commit 6001500
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 82 deletions.
2 changes: 1 addition & 1 deletion autoscaler/controllers/collectorsgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (r *CollectorsGroupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
logger := log.FromContext(ctx)
logger.V(0).Info("Reconciling CollectorsGroup")

err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config.MetricsServerEnabled)
err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllerconfig

import "k8s.io/apimachinery/pkg/util/version"

type ControllerConfig struct {
MetricsServerEnabled bool
K8sVersion *version.Version
}
2 changes: 1 addition & 1 deletion autoscaler/controllers/destination_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type DestinationReconciler struct {
func (r *DestinationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
logger.V(0).Info("Reconciling Destination")
err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config.MetricsServerEnabled)
err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
162 changes: 119 additions & 43 deletions autoscaler/controllers/gateway/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"

"sigs.k8s.io/controller-runtime/pkg/log"

"github.com/odigos-io/odigos/k8sutils/pkg/consts"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
autoscaling "k8s.io/api/autoscaling/v2"
autoscalingv2 "k8s.io/api/autoscaling/v2"
autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
Expand All @@ -29,59 +30,134 @@ var (
stabilizationWindowSeconds = intPtr(300) // cooldown period for scaling down
)

func syncHPA(gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations) error {
func syncHPA(gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, memConfig *memoryConfigurations, kubeVersion *version.Version) error {
logger := log.FromContext(ctx)

var hpa client.Object

memLimit := memConfig.gomemlimitMiB * memoryLimitPercentageForHPA / 100.0
metricQuantity := resource.MustParse(fmt.Sprintf("%dMi", memLimit))
hpa := &autoscaling.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v2",
Kind: "HorizontalPodAutoscaler",
},
ObjectMeta: metav1.ObjectMeta{
Name: gateway.Name,
Namespace: gateway.Namespace,
},
Spec: autoscaling.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscaling.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: gateway.Name,

switch {
case kubeVersion.LessThan(version.MustParse("1.23.0")):
hpa = &autoscalingv2beta1.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v2beta1",
Kind: "HorizontalPodAutoscaler",
},
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
Metrics: []autoscaling.MetricSpec{
{
Type: autoscaling.ResourceMetricSourceType,
Resource: &autoscaling.ResourceMetricSource{
Name: "memory",
Target: autoscaling.MetricTarget{
Type: autoscaling.AverageValueMetricType,
AverageValue: &metricQuantity,
ObjectMeta: buildHPACommonFields(gateway),
Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: consts.OdigosClusterCollectorDeploymentName,
},
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
Metrics: []autoscalingv2beta1.MetricSpec{
{
Type: autoscalingv2beta1.ResourceMetricSourceType,
Resource: &autoscalingv2beta1.ResourceMetricSource{
Name: "memory",
TargetAverageValue: &metricQuantity,
},
},
},
},
Behavior: &autoscaling.HorizontalPodAutoscalerBehavior{
ScaleDown: &autoscaling.HPAScalingRules{
StabilizationWindowSeconds: stabilizationWindowSeconds,
}
case kubeVersion.LessThan(version.MustParse("1.25.0")):
hpa = &autoscalingv2beta2.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v2beta2",
Kind: "HorizontalPodAutoscaler",
},
ObjectMeta: buildHPACommonFields(gateway),
Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: consts.OdigosClusterCollectorDeploymentName,
},
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
Metrics: []autoscalingv2beta2.MetricSpec{
{
Type: autoscalingv2beta2.ResourceMetricSourceType,
Resource: &autoscalingv2beta2.ResourceMetricSource{
Name: "memory",
Target: autoscalingv2beta2.MetricTarget{
Type: autoscalingv2beta2.AverageValueMetricType,
AverageValue: &metricQuantity,
},
},
},
},
Behavior: &autoscalingv2beta2.HorizontalPodAutoscalerBehavior{
ScaleDown: &autoscalingv2beta2.HPAScalingRules{
StabilizationWindowSeconds: stabilizationWindowSeconds,
},
},
},
}
default:
hpa = &autoscalingv2.HorizontalPodAutoscaler{
TypeMeta: metav1.TypeMeta{
APIVersion: "autoscaling/v2",
Kind: "HorizontalPodAutoscaler",
},
ObjectMeta: buildHPACommonFields(gateway),
Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: consts.OdigosClusterCollectorDeploymentName,
},
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
Metrics: []autoscalingv2.MetricSpec{
{
Type: autoscalingv2.ResourceMetricSourceType,
Resource: &autoscalingv2.ResourceMetricSource{
Name: "memory",
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.AverageValueMetricType,
AverageValue: &metricQuantity,
},
},
},
},
Behavior: &autoscalingv2.HorizontalPodAutoscalerBehavior{
ScaleDown: &autoscalingv2.HPAScalingRules{
StabilizationWindowSeconds: stabilizationWindowSeconds,
},
},
},
},
}
}

if err := controllerutil.SetControllerReference(gateway, hpa, scheme); err != nil {
logger.Error(err, "Failed to set controller reference")
return err
}

hpaBytes, _ := yaml.Marshal(hpa)

force := true
patchOptions := client.PatchOptions{
// Use the Apply patch strategy
applyOpts := &client.PatchOptions{
FieldManager: "odigos",
Force: &force,
Force: pointer.Bool(true),
}

if err := c.Patch(ctx, hpa, client.Apply, applyOpts); err != nil {
logger.Error(err, "Failed to apply patch to HPA")
return err
}

return c.Patch(ctx, hpa, client.RawPatch(types.ApplyPatchType, hpaBytes), &patchOptions)
logger.Info("Successfully applied HPA", "name", consts.OdigosClusterCollectorDeploymentName, "namespace", gateway.Namespace)
return nil
}

func buildHPACommonFields(gateway *odigosv1.CollectorsGroup) metav1.ObjectMeta {
return metav1.ObjectMeta{
Name: consts.OdigosClusterCollectorDeploymentName,
Namespace: gateway.Namespace,
}
}
15 changes: 7 additions & 8 deletions autoscaler/controllers/gateway/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
controllerconfig "github.com/odigos-io/odigos/autoscaler/controllers/controller_config"
odigoscommon "github.com/odigos-io/odigos/common"
k8sconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
Expand All @@ -22,7 +23,7 @@ var (
)

func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string,
metricsServerExists bool) error {
config *controllerconfig.ControllerConfig) error {
logger := log.FromContext(ctx)

odigosNs := env.GetCurrentNamespace()
Expand Down Expand Up @@ -55,7 +56,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme,
}

err = syncGateway(&dests, &processors, &gatewayCollectorGroup, ctx, k8sClient, scheme, imagePullSecrets, odigosVersion, &odigosConfig,
metricsServerExists)
config)
statusPatchString := commonconf.GetCollectorsGroupDeployedConditionsPatch(err)
statusErr := k8sClient.Status().Patch(ctx, &gatewayCollectorGroup, client.RawPatch(types.MergePatchType, []byte(statusPatchString)))
if statusErr != nil {
Expand All @@ -68,7 +69,7 @@ func Sync(ctx context.Context, k8sClient client.Client, scheme *runtime.Scheme,
func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.ProcessorList,
gateway *odigosv1.CollectorsGroup, ctx context.Context,
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigosConfig *odigoscommon.OdigosConfiguration,
metricsServerExists bool) error {
config *controllerconfig.ControllerConfig) error {
logger := log.FromContext(ctx)
logger.V(0).Info("Syncing gateway")

Expand Down Expand Up @@ -104,11 +105,9 @@ func syncGateway(dests *odigosv1.DestinationList, processors *odigosv1.Processor
return err
}

if metricsServerExists {
err = syncHPA(gateway, ctx, c, scheme, memConfig)
if err != nil {
logger.Error(err, "Failed to sync HPA")
}
err = syncHPA(gateway, ctx, c, scheme, memConfig, config.K8sVersion)
if err != nil {
logger.Error(err, "Failed to sync HPA")
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion autoscaler/controllers/odigosconfig_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r *OdigosConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request
logger := log.FromContext(ctx)
logger.V(0).Info("Reconciling Odigos Configuration")

err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config.MetricsServerEnabled)
err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion autoscaler/controllers/processor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *ProcessorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
logger := log.FromContext(ctx)
logger.V(0).Info("Reconciling Processor")

err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config.MetricsServerEnabled)
err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion autoscaler/controllers/secret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (r *SecretReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
logger := log.FromContext(ctx)
logger.V(0).Info("Reconciling Secret")

err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config.MetricsServerEnabled)
err := gateway.Sync(ctx, r.Client, r.Scheme, r.ImagePullSecrets, r.OdigosVersion, r.Config)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
4 changes: 2 additions & 2 deletions autoscaler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ require (
k8s.io/api v0.31.0
k8s.io/apimachinery v0.31.0
k8s.io/client-go v0.31.0
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
sigs.k8s.io/controller-runtime v0.19.0
sigs.k8s.io/yaml v1.4.0
)

require (
Expand Down Expand Up @@ -86,9 +86,9 @@ require (
k8s.io/apiextensions-apiserver v0.31.0 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240430033511-f0e62f92d13f // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace (
Expand Down
25 changes: 2 additions & 23 deletions autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"os"
"strings"

apierrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/odigos-io/odigos/k8sutils/pkg/env"
odigosver "github.com/odigos-io/odigos/k8sutils/pkg/version"

Expand All @@ -34,17 +32,16 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"

"github.com/go-logr/logr"
"github.com/go-logr/zapr"
bridge "github.com/odigos-io/opentelemetry-zap-bridge"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

_ "k8s.io/client-go/plugin/pkg/client/auth"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -188,7 +185,7 @@ func main() {
_, disableNameProcessor := os.LookupEnv("DISABLE_NAME_PROCESSOR")

config := &controllerconfig.ControllerConfig{
MetricsServerEnabled: isMetricsServerInstalled(mgr, setupLog),
K8sVersion: k8sVersion,
}

if err = (&controllers.DestinationReconciler{
Expand Down Expand Up @@ -292,21 +289,3 @@ func main() {
os.Exit(1)
}
}

func isMetricsServerInstalled(mgr ctrl.Manager, logger logr.Logger) bool {
var metricsServerDeployment appsv1.Deployment
// Use APIReader (uncached client) for direct access to the API server
// uses because mgr not cache the metrics-server deployment
err := mgr.GetAPIReader().Get(context.TODO(), types.NamespacedName{Name: "metrics-server", Namespace: "kube-system"}, &metricsServerDeployment)
if err != nil {
if apierrors.IsNotFound(err) {
logger.Info("Metrics-server deployment not found")
} else {
logger.Error(err, "Failed to get metrics-server deployment")
}
return false
}

logger.V(0).Info("Metrics server found")
return true
}

0 comments on commit 6001500

Please sign in to comment.