From 195ba0aa475a051af88ffc531b93261d8996f024 Mon Sep 17 00:00:00 2001 From: DerekFrank Date: Mon, 6 Oct 2025 14:18:26 -0700 Subject: [PATCH] refactor: use common sidecar flags functionality --- cmd/csi-attacher/main.go | 111 +++++------------- .../csi-lib-utils/config/config.go | 39 ++++++ vendor/modules.txt | 1 + 3 files changed, 69 insertions(+), 82 deletions(-) create mode 100644 vendor/github.com/kubernetes-csi/csi-lib-utils/config/config.go diff --git a/cmd/csi-attacher/main.go b/cmd/csi-attacher/main.go index 6a2ae3927..33c54f563 100644 --- a/cmd/csi-attacher/main.go +++ b/cmd/csi-attacher/main.go @@ -31,8 +31,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" utilflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/featuregate" @@ -46,6 +44,7 @@ import ( "k8s.io/klog/v2" "github.com/container-storage-interface/spec/lib/go/csi" + libconfig "github.com/kubernetes-csi/csi-lib-utils/config" "github.com/kubernetes-csi/csi-lib-utils/connection" "github.com/kubernetes-csi/csi-lib-utils/leaderelection" "github.com/kubernetes-csi/csi-lib-utils/metrics" @@ -65,34 +64,17 @@ const ( // Command line flags var ( - kubeconfig = flag.String("kubeconfig", "", "Absolute path to the kubeconfig file. Required only when running out of cluster.") - resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.") - csiAddress = flag.String("csi-address", "/run/csi/socket", "Address of the CSI driver socket.") - showVersion = flag.Bool("version", false, "Show version.") - timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") - workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads") - maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.") - - retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed create volume or deletion. It doubles with each failure, up to retry-interval-max.") - retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed create volume or deletion.") - - enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.") - leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.") - leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Duration, in seconds, that non-leader candidates will wait to force acquire leadership. Defaults to 15 seconds.") - leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 10*time.Second, "Duration, in seconds, that the acting leader will retry refreshing leadership before giving up. Defaults to 10 seconds.") - leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 5*time.Second, "Duration, in seconds, the LeaderElector clients should wait between tries of actions. Defaults to 5 seconds.") + resync = flag.Duration("resync", 10*time.Minute, "Resync interval of the controller.") + timeout = flag.Duration("timeout", 15*time.Second, "Timeout for waiting for attaching or detaching the volume.") + retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.") + retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.") + workerThreads = flag.Uint("worker-threads", 10, "Number of attacher worker threads") + maxEntries = flag.Int("max-entries", 0, "Max entries per each page in volume lister call, 0 means no limit.") defaultFSType = flag.String("default-fstype", "", "The default filesystem type of the volume to publish. Defaults to empty string") reconcileSync = flag.Duration("reconcile-sync", 1*time.Minute, "Resync interval of the VolumeAttachment reconciler.") - metricsAddress = flag.String("metrics-address", "", "(deprecated) The TCP network address where the prometheus metrics endpoint will listen (example: `:8080`). The default is empty string, which means metrics endpoint is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - httpEndpoint = flag.String("http-endpoint", "", "The TCP network address where the HTTP server for diagnostics, including metrics and leader election health check, will listen (example: `:8080`). The default is empty string, which means the server is disabled. Only one of `--metrics-address` and `--http-endpoint` can be set.") - metricsPath = flag.String("metrics-path", "/metrics", "The HTTP path where prometheus metrics will be exposed. Default is `/metrics`.") - - kubeAPIQPS = flag.Float64("kube-api-qps", 5, "QPS to use while communicating with the kubernetes apiserver. Defaults to 5.0.") - kubeAPIBurst = flag.Int("kube-api-burst", 10, "Burst to use while communicating with the kubernetes apiserver. Defaults to 10.") - maxGRPCLogLength = flag.Int("max-grpc-log-length", -1, "The maximum amount of characters logged for every grpc responses. Defaults to no limit") featureGates map[string]bool @@ -111,6 +93,7 @@ func main() { c := logsapi.NewLoggingConfiguration() logsapi.AddGoFlags(c, flag.CommandLine) logs.InitLogs() + standardflags.RegisterCommonFlags(flag.CommandLine) standardflags.AddAutomaxprocs(klog.Infof) flag.Parse() logger := klog.Background() @@ -124,29 +107,27 @@ func main() { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if *showVersion { + if standardflags.Configuration.ShowVersion { fmt.Println(os.Args[0], version) return } logger.Info("Version", "version", version) - if *metricsAddress != "" && *httpEndpoint != "" { + if standardflags.Configuration.MetricsAddress != "" && standardflags.Configuration.HttpEndpoint != "" { logger.Error(nil, "Only one of `--metrics-address` and `--http-endpoint` can be set") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - addr := *metricsAddress + addr := standardflags.Configuration.MetricsAddress if addr == "" { - addr = *httpEndpoint + addr = standardflags.Configuration.HttpEndpoint } // Create the client config. Use kubeconfig if given, otherwise assume in-cluster. - config, err := buildConfig(*kubeconfig) + config, err := libconfig.BuildConfig(standardflags.Configuration.KubeConfig, standardflags.Configuration) if err != nil { logger.Error(err, "Failed to build a Kubernetes config") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - config.QPS = (float32)(*kubeAPIQPS) - config.Burst = *kubeAPIBurst config.ContentType = runtime.ContentTypeProtobuf if *workerThreads == 0 { @@ -167,9 +148,9 @@ func main() { // Connect to CSI. connection.SetMaxGRPCLogLength(*maxGRPCLogLength) ctx := context.Background() - csiConn, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + csiConn, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { - logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress) + logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", standardflags.Configuration.CSIAddress) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } @@ -194,9 +175,9 @@ func main() { translator := csitrans.New() if translator.IsMigratedCSIDriverByName(csiAttacher) { metricsManager = metrics.NewCSIMetricsManagerWithOptions(csiAttacher, metrics.WithMigration()) - migratedCsiClient, err := connection.Connect(ctx, *csiAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) + migratedCsiClient, err := connection.Connect(ctx, standardflags.Configuration.CSIAddress, metricsManager, connection.OnConnectionLoss(connection.ExitOnConnectionLoss())) if err != nil { - logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", *csiAddress, "migrated", true) + logger.Error(err, "Failed to connect to the CSI driver", "csiAddress", standardflags.Configuration.CSIAddress, "migrated", true) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } csiConn.Close() @@ -216,13 +197,13 @@ func main() { // Prepare http endpoint for metrics + leader election healthz mux := http.NewServeMux() if addr != "" { - metricsManager.RegisterToServer(mux, *metricsPath) + metricsManager.RegisterToServer(mux, standardflags.Configuration.MetricsPath) metricsManager.SetDriverName(csiAttacher) go func() { - logger.Info("ServeMux listening", "address", addr, "metricsPath", *metricsPath) + logger.Info("ServeMux listening", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath) err := http.ListenAndServe(addr, mux) if err != nil { - logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", *metricsPath) + logger.Error(err, "Failed to start HTTP server at specified address and metrics path", "address", addr, "metricsPath", standardflags.Configuration.MetricsPath) klog.FlushAndExit(klog.ExitFlushTimeout, 1) } }() @@ -332,49 +313,15 @@ func main() { } } - if !*enableLeaderElection { - run(klog.NewContext(context.Background(), logger)) - } else { - // Create a new clientset for leader election. When the attacher - // gets busy and its client gets throttled, the leader election - // can proceed without issues. - leClientset, err := kubernetes.NewForConfig(config) - if err != nil { - logger.Error(err, "Failed to create leaderelection client") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - - // Name of config map with leader election lock - lockName := "external-attacher-leader-" + csiAttacher - le := leaderelection.NewLeaderElection(leClientset, lockName, run) - if *httpEndpoint != "" { - le.PrepareHealthCheck(mux, leaderelection.DefaultHealthCheckTimeout) - } - - if *leaderElectionNamespace != "" { - le.WithNamespace(*leaderElectionNamespace) - } - - le.WithLeaseDuration(*leaderElectionLeaseDuration) - le.WithRenewDeadline(*leaderElectionRenewDeadline) - le.WithRetryPeriod(*leaderElectionRetryPeriod) - if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) { - le.WithReleaseOnCancel(true) - le.WithContext(ctx) - } - - if err := le.Run(); err != nil { - logger.Error(err, "Failed to initialize leader election") - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } - } -} - -func buildConfig(kubeconfig string) (*rest.Config, error) { - if kubeconfig != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - return rest.InClusterConfig() + leaderelection.RunWithLeaderElection( + ctx, + config, + standardflags.Configuration, + run, + "external-attacher-leader-"+csiAttacher, + mux, + utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit), + ) } func supportsControllerCapabilities(ctx context.Context, csiConn *grpc.ClientConn) (bool, bool, bool, bool, error) { diff --git a/vendor/github.com/kubernetes-csi/csi-lib-utils/config/config.go b/vendor/github.com/kubernetes-csi/csi-lib-utils/config/config.go new file mode 100644 index 000000000..f29f85e2e --- /dev/null +++ b/vendor/github.com/kubernetes-csi/csi-lib-utils/config/config.go @@ -0,0 +1,39 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package config + +import ( + "github.com/kubernetes-csi/csi-lib-utils/standardflags" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +func BuildConfig(kubeconfig string, opts standardflags.SidecarConfiguration) (*rest.Config, error) { + config, err := buildConfig(kubeconfig) + if err != nil { + return config, err + } + config.QPS = float32(opts.KubeAPIQPS) + config.Burst = opts.KubeAPIBurst + return config, nil +} + +func buildConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 7608d95de..41b7c3db7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -145,6 +145,7 @@ github.com/json-iterator/go # github.com/kubernetes-csi/csi-lib-utils v0.23.1 ## explicit; go 1.24.6 github.com/kubernetes-csi/csi-lib-utils/accessmodes +github.com/kubernetes-csi/csi-lib-utils/config github.com/kubernetes-csi/csi-lib-utils/connection github.com/kubernetes-csi/csi-lib-utils/leaderelection github.com/kubernetes-csi/csi-lib-utils/metrics