Skip to content

Commit

Permalink
Add QPS related parameters to control the request rate of metrics-ada…
Browse files Browse the repository at this point in the history
…pter to member clusters.

Signed-off-by: chaunceyjiang <[email protected]>
  • Loading branch information
chaunceyjiang committed Apr 16, 2024
1 parent f369c55 commit 6f29134
Show file tree
Hide file tree
Showing 11 changed files with 59 additions and 43 deletions.
8 changes: 4 additions & 4 deletions cmd/agent/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) {
"Specifies the cluster lease renew interval fraction.")
fs.DurationVar(&o.ClusterSuccessThreshold.Duration, "cluster-success-threshold", 30*time.Second, "The duration of successes for the cluster to be considered healthy after recovery.")
fs.DurationVar(&o.ClusterFailureThreshold.Duration, "cluster-failure-threshold", 30*time.Second, "The duration of failure for the cluster to be considered unhealthy.")
fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver.")
fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
fs.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.")
fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.")
fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.")
Expand Down
4 changes: 2 additions & 2 deletions cmd/aggregated-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.RecommendedOptions.AddFlags(flags)
flags.Lookup("kubeconfig").Usage = "Path to karmada control plane kubeconfig file."

flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
_ = utilfeature.DefaultMutableFeatureGate.Add(pkgfeatures.DefaultFeatureGates)
utilfeature.DefaultMutableFeatureGate.AddFlag(flags)
o.ProfileOpts.AddFlags(flags)
Expand Down
8 changes: 4 additions & 4 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
"Note: 'karmada-system', 'karmada-cluster' and 'karmada-es-.*' are Karmada reserved namespaces that will always be skipped.")
flags.StringVar(&o.ClusterAPIContext, "cluster-api-context", "", "Name of the cluster context in cluster-api management cluster kubeconfig file.")
flags.StringVar(&o.ClusterAPIKubeconfig, "cluster-api-kubeconfig", "", "Path to the cluster-api management cluster kubeconfig file.")
flags.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver.")
flags.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver.")
flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.")
flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.")
flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8080, :8080). It can be set to \"0\" to disable the metrics serving.")
Expand Down
4 changes: 2 additions & 2 deletions cmd/descheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.")
fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
fs.IntVar(&o.SchedulerEstimatorPort, "scheduler-estimator-port", defaultEstimatorPort, "The secure port on which to connect the accurate scheduler estimator.")
fs.StringVar(&o.SchedulerEstimatorServicePrefix, "scheduler-estimator-service-prefix", "karmada-scheduler-estimator", "The prefix of scheduler estimator service name")
Expand Down
4 changes: 2 additions & 2 deletions cmd/karmada-search/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
o.RecommendedOptions.AddFlags(flags)
flags.Lookup("kubeconfig").Usage = "Path to karmada control plane kubeconfig file."

flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
flags.BoolVar(&o.DisableSearch, "disable-search", false, "Disable search feature that would save memory usage significantly.")
flags.BoolVar(&o.DisableProxy, "disable-proxy", false, "Disable proxy feature that would save memory usage significantly.")

Expand Down
20 changes: 16 additions & 4 deletions cmd/metrics-adapter/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
generatedopenapi "github.com/karmada-io/karmada/pkg/generated/openapi"
"github.com/karmada-io/karmada/pkg/metricsadapter"
"github.com/karmada-io/karmada/pkg/sharedcli/profileflag"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/version"
)

Expand All @@ -42,8 +43,15 @@ type Options struct {
CustomMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions

KubeConfig string

ProfileOpts profileflag.Options
// ClusterAPIQPS is the QPS to use while talking with cluster kube-apiserver.
ClusterAPIQPS float32
// ClusterAPIBurst is the burst to allow while talking with cluster kube-apiserver.
ClusterAPIBurst int
// KubeAPIQPS is the QPS to use while talking with karmada-apiserver.
KubeAPIQPS float32
// KubeAPIBurst is the burst to allow while talking with karmada-apiserver.
KubeAPIBurst int
ProfileOpts profileflag.Options
}

// NewOptions builds a default metrics-adapter options.
Expand All @@ -64,7 +72,10 @@ func (o *Options) Complete() error {
func (o *Options) AddFlags(fs *pflag.FlagSet) {
o.CustomMetricsAdapterServerOptions.AddFlags(fs)
o.ProfileOpts.AddFlags(fs)

fs.Float32Var(&o.ClusterAPIQPS, "cluster-api-qps", 40.0, "QPS to use while talking with cluster kube-apiserver.")
fs.IntVar(&o.ClusterAPIBurst, "cluster-api-burst", 60, "Burst to use while talking with cluster kube-apiserver.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
fs.StringVar(&o.KubeConfig, "kubeconfig", o.KubeConfig, "Path to karmada control plane kubeconfig file.")
}

Expand All @@ -75,12 +86,13 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) {
klog.Errorf("Unable to build restConfig: %v", err)
return nil, err
}
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst

karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory)
metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory, &util.ClientOption{QPS: o.ClusterAPIQPS, Burst: o.ClusterAPIBurst})
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIV3Config = genericapiserver.DefaultOpenAPIV3Config(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
Expand Down
4 changes: 2 additions & 2 deletions cmd/scheduler-estimator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.")
fs.IntVar(&o.ServerPort, "server-port", defaultServerPort, "The secure port on which to serve gRPC.")
fs.IntVar(&o.SecurePort, "secure-port", defaultHealthzPort, "The secure port on which to serve HTTPS.")
fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.Float32Var(&o.ClusterAPIQPS, "kube-api-qps", 20.0, "QPS to use while talking with apiserver.")
fs.IntVar(&o.ClusterAPIBurst, "kube-api-burst", 30, "Burst to use while talking with apiserver.")
fs.IntVar(&o.Parallelism, "parallelism", o.Parallelism, "Parallelism defines the amount of parallelism in algorithms for estimating. Must be greater than 0. Defaults to 16.")
features.FeatureGate.AddFlag(fs)

Expand Down
4 changes: 2 additions & 2 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server. Overrides any value in KubeConfig. Only required if out-of-cluster.")
fs.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.")
fs.IntVar(&o.SecurePort, "secure-port", defaultPort, "The secure port on which to serve HTTPS.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
fs.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
fs.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
fs.BoolVar(&o.EnableSchedulerEstimator, "enable-scheduler-estimator", false, "Enable calling cluster scheduler estimator for adjusting replicas.")
fs.BoolVar(&o.DisableSchedulerEstimatorInPullMode, "disable-scheduler-estimator-in-pull-mode", false, "Disable the scheduler estimator for clusters in pull mode, which takes effect only when enable-scheduler-estimator is true.")
fs.DurationVar(&o.SchedulerEstimatorTimeout.Duration, "scheduler-estimator-timeout", 3*time.Second, "Specifies the timeout period of calling the scheduler estimator service.")
Expand Down
4 changes: 2 additions & 2 deletions cmd/webhook/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.CertName, "tls-cert-file-name", "tls.crt", "The name of server certificate.")
flags.StringVar(&o.KeyName, "tls-private-key-file-name", "tls.key", "The name of server key.")
flags.StringVar(&o.TLSMinVersion, "tls-min-version", defaultTLSMinVersion, "Minimum TLS version supported. Possible values: 1.0, 1.1, 1.2, 1.3.")
flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
flags.Float32Var(&o.KubeAPIQPS, "kube-api-qps", 40.0, "QPS to use while talking with karmada-apiserver.")
flags.IntVar(&o.KubeAPIBurst, "kube-api-burst", 60, "Burst to use while talking with karmada-apiserver.")
flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8080, :8080). It can be set to \"0\" to disable the metrics serving.")
flags.StringVar(&o.HealthProbeBindAddress, "health-probe-bind-address", ":8000", "The TCP address that the controller should bind to for serving health probes(e.g. 127.0.0.1:8000, :8000)")

Expand Down
24 changes: 12 additions & 12 deletions pkg/metricsadapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ type MetricsController struct {
}

// NewMetricsController creates a new metrics controller
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory) *MetricsController {
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) *MetricsController {
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
controller := &MetricsController{
InformerFactory: factory,
ClusterLister: clusterLister,
MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory),
MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory, clusterClientOption),
InformerManager: genericmanager.GetInstance(),
TypedInformerManager: newInstance(),
restConfig: restConfig,
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *MetricsController) updateCluster(oldObj, curObj interface{}) {
if util.ClusterAccessCredentialChanged(curCluster.Spec, oldCluster.Spec) ||
util.IsClusterReady(&curCluster.Status) != util.IsClusterReady(&oldCluster.Status) {
// Cluster.Spec or Cluster health state is changed, rebuild informer.
m.InformerManager.Stop(curCluster.GetName())
m.stopInformerManager(curCluster.GetName())
m.queue.Add(curCluster.GetName())
}
}
Expand Down Expand Up @@ -213,27 +213,21 @@ func (m *MetricsController) handleClusters() bool {
if err != nil {
if apierrors.IsNotFound(err) {
klog.Infof("try to stop cluster informer %s", clusterName)
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
m.stopInformerManager(clusterName)
return true
}
return false
}

if !cls.DeletionTimestamp.IsZero() {
klog.Infof("try to stop cluster informer %s", clusterName)
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
m.stopInformerManager(clusterName)
return true
}

if !util.IsClusterReady(&cls.Status) {
klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName)
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
m.stopInformerManager(clusterName)
return false
}

Expand Down Expand Up @@ -276,3 +270,9 @@ func (m *MetricsController) handleClusters() bool {

return true
}

func (m *MetricsController) stopInformerManager(clusterName string) {
m.TypedInformerManager.Stop(clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
}
18 changes: 11 additions & 7 deletions pkg/metricsadapter/multiclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ type MultiClusterDiscoveryInterface interface {
// MultiClusterDiscovery provides DiscoveryClient for multiple clusters.
type MultiClusterDiscovery struct {
sync.RWMutex
clients map[string]*discovery.DiscoveryClient
secretLister listcorev1.SecretLister
clusterLister clusterlister.ClusterLister
clients map[string]*discovery.DiscoveryClient
clusterClientOption *util.ClientOption
secretLister listcorev1.SecretLister
clusterLister clusterlister.ClusterLister
}

// NewMultiClusterDiscoveryClient returns a new MultiClusterDiscovery
func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory) MultiClusterDiscoveryInterface {
func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory, clusterClientOption *util.ClientOption) MultiClusterDiscoveryInterface {
return &MultiClusterDiscovery{
clusterLister: clusterLister,
secretLister: KubeFactory.Core().V1().Secrets().Lister(),
clients: map[string]*discovery.DiscoveryClient{},
clusterLister: clusterLister,
secretLister: KubeFactory.Core().V1().Secrets().Lister(),
clients: map[string]*discovery.DiscoveryClient{},
clusterClientOption: clusterClientOption,
}
}

Expand All @@ -72,6 +74,8 @@ func (m *MultiClusterDiscovery) Set(clusterName string) error {
if err != nil {
return err
}
clusterConfig.QPS = m.clusterClientOption.QPS
clusterConfig.Burst = m.clusterClientOption.Burst
m.Lock()
defer m.Unlock()
m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig)
Expand Down

0 comments on commit 6f29134

Please sign in to comment.