diff --git a/Gopkg.lock b/Gopkg.lock index c42c6640f27c6..9f812de8a563e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -201,8 +201,8 @@ [[projects]] name = "github.com/json-iterator/go" packages = ["."] - revision = "28452fcdec4e44348d2af0d91d1e9e38da3a9e0a" - version = "1.0.5" + revision = "e7c7f3b33712573affdcc7a107218e7926b9a05b" + version = "1.0.6" [[projects]] name = "github.com/juju/ratelimit" @@ -260,7 +260,7 @@ branch = "master" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - revision = "650f4a345ab4e5b245a3034b110ebc7299e68186" + revision = "432090b8f568c018896cd8a0fb0345872bbac6ce" [[projects]] branch = "master" @@ -275,7 +275,7 @@ "lex/httplex", "trace" ] - revision = "f5dfe339be1d06f81b22525fe34671ee7d2c8904" + revision = "cbe0f9307d0156177f9dd5dc85da1a31abc5f2fb" [[projects]] branch = "master" @@ -327,7 +327,7 @@ "go/ast/astutil", "imports" ] - revision = "ce871d178848e3eea1e8795e5cfb74053dde4bb9" + revision = "5e776fee60db37e560cee3fb46db699d2f095386" [[projects]] name = "google.golang.org/appengine" @@ -366,6 +366,7 @@ "connectivity", "credentials", "encoding", + "encoding/proto", "grpclb/grpc_lb_v1/messages", "grpclog", "internal", @@ -381,8 +382,8 @@ "tap", "transport" ] - revision = "6b51017f791ae1cfbec89c52efdf444b13b550ef" - version = "v1.9.2" + revision = "8e4536a86ab602859c20df5ebfd0bd4228d08655" + version = "v1.10.0" [[projects]] name = "gopkg.in/inf.v0" @@ -441,7 +442,7 @@ "pkg/client/clientset/clientset/scheme", "pkg/client/clientset/clientset/typed/apiextensions/v1beta1" ] - revision = "c726b9ed6b50a8e7247ea43a9087b06b5ed40379" + revision = "b89f5ce12ce6e022fc3e9d7586d61346e694d56e" [[projects]] branch = "release-1.9" @@ -546,7 +547,8 @@ "util/flowcontrol", "util/homedir", "util/integer", - "util/jsonpath" + "util/jsonpath", + "util/workqueue" ] revision = "9389c055a838d4f208b699b3c7c51b70f2368861" @@ -576,11 +578,11 @@ branch = "master" name = "k8s.io/kube-openapi" packages = ["pkg/common"] - revision = "275e2ce91dec4c05a4094a7b1daee5560b555ac9" + revision = "50ae88d24ede7b8bad68e23c805b5d3da5c8abaf" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "f952799b75f7db46976f7e4aa15d5f9b1908f28fbd4d9ddb20381a5f160cc07e" + inputs-digest = "0aa93ce59362fb4c878e843ba8ebed06d419f8aa29012f099a213aded7b4940c" solver-name = "gps-cdcl" solver-version = 1 diff --git a/application/controller/controller.go b/application/controller/controller.go new file mode 100644 index 0000000000000..8b39d14b83d55 --- /dev/null +++ b/application/controller/controller.go @@ -0,0 +1,107 @@ +package controller + +import ( + "context" + + appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned" + appinformers "github.com/argoproj/argo-cd/pkg/client/informers/externalversions" + log "github.com/sirupsen/logrus" + + "time" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +const ( + appResyncPeriod = 10 * time.Minute +) + +// ApplicationController is the controller for application resources. +type ApplicationController struct { + kubeclientset kubernetes.Interface + applicationclientset appclientset.Interface + appQueue workqueue.RateLimitingInterface + + appInformer cache.SharedIndexInformer +} + +// NewApplicationController creates new instance of ApplicationController. +func NewApplicationController(kubeclientset kubernetes.Interface, applicationclientset appclientset.Interface) *ApplicationController { + appQueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + return &ApplicationController{ + kubeclientset: kubeclientset, + applicationclientset: applicationclientset, + appQueue: appQueue, + appInformer: newApplicationInformer(applicationclientset, appQueue), + } +} + +// Run starts the Application CRD controller. +func (ctrl *ApplicationController) Run(ctx context.Context, appWorkers int) { + defer runtime.HandleCrash() + defer ctrl.appQueue.ShutDown() + + go ctrl.appInformer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), ctrl.appInformer.HasSynced) { + log.Error("Timed out waiting for caches to sync") + return + } + + for i := 0; i < appWorkers; i++ { + go wait.Until(ctrl.runWorker, time.Second, ctx.Done()) + } + + <-ctx.Done() +} + +func (ctrl *ApplicationController) processNextItem() bool { + appKey, shutdown := ctrl.appQueue.Get() + defer ctrl.appQueue.Done(appKey) + if shutdown { + return false + } + return true +} + +func (ctrl *ApplicationController) runWorker() { + for ctrl.processNextItem() { + } +} + +func newApplicationInformer(appclientset appclientset.Interface, appQueue workqueue.RateLimitingInterface) cache.SharedIndexInformer { + appInformerFactory := appinformers.NewSharedInformerFactory( + appclientset, + appResyncPeriod, + ) + informer := appInformerFactory.Argoproj().V1alpha1().Applications().Informer() + informer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + appQueue.Add(key) + } + }, + UpdateFunc: func(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + appQueue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta queue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + appQueue.Add(key) + } + }, + }, + ) + return informer +} diff --git a/cmd/argocd/application-controller/main.go b/cmd/argocd/application-controller/main.go new file mode 100644 index 0000000000000..f6df2a44ba7ed --- /dev/null +++ b/cmd/argocd/application-controller/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "fmt" + "github.com/argoproj/argo-cd/application/controller" + "github.com/argoproj/argo-cd/cmd/argocd/commands" + appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned" + "github.com/spf13/cobra" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "os" +) + +const ( + // CLIName is the name of the CLI + cliName = "application-controller" +) + +func newCommand() *cobra.Command { + var ( + kubeConfigOverrides clientcmd.ConfigOverrides + kubeConfigPath string + ) + var command = cobra.Command{ + Use: cliName, + Short: "application-controller is a controller to operate on applications CRD", + Run: func(c *cobra.Command, args []string) { + kubeConfig := commands.GetKubeConfig(kubeConfigPath, kubeConfigOverrides) + + kubeClient := kubernetes.NewForConfigOrDie(kubeConfig) + appClient := appclientset.NewForConfigOrDie(kubeConfig) + + appController := controller.NewApplicationController(kubeClient, appClient) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go appController.Run(ctx, 1) + // Wait forever + select {} + }, + } + + command.Flags().StringVar(&kubeConfigPath, "kubeconfig", "", "Path to the config file to use for CLI requests.") + kubeConfigOverrides = clientcmd.ConfigOverrides{} + clientcmd.BindOverrideFlags(&kubeConfigOverrides, command.Flags(), clientcmd.RecommendedConfigOverrideFlags("")) + + return &command +} + +func main() { + if err := newCommand().Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } +} diff --git a/cmd/argocd/commands/common.go b/cmd/argocd/commands/common.go index ce942334b166a..309cac4f7eac6 100644 --- a/cmd/argocd/commands/common.go +++ b/cmd/argocd/commands/common.go @@ -20,7 +20,8 @@ var ( imageTag = "latest" ) -func getKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest.Config { +// GetKubeClient creates new kubernetes client config using specified config path and config overrides variables +func GetKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest.Config { loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() loadingRules.ExplicitPath = configPath clientConfig := clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin) @@ -33,8 +34,9 @@ func getKubeConfig(configPath string, overrides clientcmd.ConfigOverrides) *rest return restConfig } -func getKubeClient(configPath string, overrides clientcmd.ConfigOverrides) *kubernetes.Clientset { - restConfig := getKubeConfig(configPath, overrides) +// GetKubeClient creates new kubernetes client using specified config path and config overrides variables +func GetKubeClient(configPath string, overrides clientcmd.ConfigOverrides) *kubernetes.Clientset { + restConfig := GetKubeConfig(configPath, overrides) clientset, err := kubernetes.NewForConfig(restConfig) if err != nil { log.Fatal(err) diff --git a/cmd/argocd/commands/install.go b/cmd/argocd/commands/install.go index 51b398aadc17c..eae3a277adf05 100644 --- a/cmd/argocd/commands/install.go +++ b/cmd/argocd/commands/install.go @@ -14,7 +14,6 @@ import ( apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" ) // InstallFlags has all the required parameters for installing Argo CD. @@ -32,10 +31,9 @@ func NewInstallCommand(globalArgs *globalFlags) *cobra.Command { Short: "Install the argocd components", Long: "Install the argocd components", Run: func(c *cobra.Command, args []string) { - client := getKubeClient(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides) - extensionsClient := apiextensionsclient.NewForConfigOrDie(getKubeConfig(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides)) - installAppCRD(client, extensionsClient, installArgs) - installClusterCRD(client, extensionsClient, installArgs) + extensionsClient := apiextensionsclient.NewForConfigOrDie(GetKubeConfig(globalArgs.kubeConfigPath, globalArgs.kubeConfigOverrides)) + installAppCRD(extensionsClient, installArgs) + installClusterCRD(extensionsClient, installArgs) }, } command.Flags().BoolVar(&installArgs.DryRun, "dry-run", false, "print the kubernetes manifests to stdout instead of installing") @@ -43,7 +41,7 @@ func NewInstallCommand(globalArgs *globalFlags) *cobra.Command { return command } -func installAppCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, args InstallFlags) { +func installAppCRD(extensionsClient *apiextensionsclient.Clientset, args InstallFlags) { applicationCRD := apiextensionsv1beta1.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ APIVersion: "apiextensions.k8s.io/v1alpha1", @@ -63,10 +61,10 @@ func installAppCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensi }, }, } - createCRDHelper(clientset, extensionsClient, &applicationCRD, args.DryRun) + createCRDHelper(extensionsClient, &applicationCRD, args.DryRun) } -func installClusterCRD(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, args InstallFlags) { +func installClusterCRD(extensionsClient *apiextensionsclient.Clientset, args InstallFlags) { clusterCRD := apiextensionsv1beta1.CustomResourceDefinition{ TypeMeta: metav1.TypeMeta{ APIVersion: "apiextensions.k8s.io/v1alpha1", @@ -86,10 +84,10 @@ func installClusterCRD(clientset *kubernetes.Clientset, extensionsClient *apiext }, }, } - createCRDHelper(clientset, extensionsClient, &clusterCRD, args.DryRun) + createCRDHelper(extensionsClient, &clusterCRD, args.DryRun) } -func createCRDHelper(clientset *kubernetes.Clientset, extensionsClient *apiextensionsclient.Clientset, crd *apiextensionsv1beta1.CustomResourceDefinition, dryRun bool) { +func createCRDHelper(extensionsClient *apiextensionsclient.Clientset, crd *apiextensionsv1beta1.CustomResourceDefinition, dryRun bool) { if dryRun { printYAML(crd) return