diff --git a/controller/appcontroller.go b/controller/appcontroller.go index 038d574e9240c..e5469027f5b7e 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -148,12 +149,28 @@ func (ctrl *ApplicationController) watchClusterResources(ctx context.Context, it } }() config := item.RESTConfig() - ch, err := kube.WatchResourcesWithLabel(ctx, config, "", common.LabelApplicationName) + watchStartTime := time.Now() + ch, err := ctrl.kubectl.WatchResources(ctx, config, "", func(gvk schema.GroupVersionKind) metav1.ListOptions { + ops := metav1.ListOptions{} + if !kube.IsCRDGroupVersionKind(gvk) { + ops.LabelSelector = common.LabelApplicationName + } + return ops + }) + if err != nil { return err } for event := range ch { eventObj := event.Object.(*unstructured.Unstructured) + if kube.IsCRD(eventObj) { + // restart if new CRD has been created after watch started + if event.Type == watch.Added && watchStartTime.Before(eventObj.GetCreationTimestamp().Time) { + return fmt.Errorf("Restarting the watch because a new CRD was added.") + } else if event.Type == watch.Deleted { + return fmt.Errorf("Restarting the watch because a CRD was deleted.") + } + } objLabels := eventObj.GetLabels() if objLabels == nil { objLabels = make(map[string]string) diff --git a/controller/sync_test.go b/controller/sync_test.go index 7e300031479ea..01dbdcffe5b5f 100644 --- a/controller/sync_test.go +++ b/controller/sync_test.go @@ -1,10 +1,14 @@ package controller import ( + "context" "fmt" "sort" "testing" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" "github.com/argoproj/argo-cd/util/kube" log "github.com/sirupsen/logrus" @@ -25,6 +29,13 @@ type kubectlOutput struct { type mockKubectlCmd struct { commands map[string]kubectlOutput + events chan watch.Event +} + +func (k mockKubectlCmd) WatchResources( + ctx context.Context, config *rest.Config, namespace string, selector func(kind schema.GroupVersionKind) v1.ListOptions) (chan watch.Event, error) { + + return k.events, nil } func (k mockKubectlCmd) DeleteResource(config *rest.Config, obj *unstructured.Unstructured, namespace string) error { diff --git a/util/kube/ctl.go b/util/kube/ctl.go index 9b9d01340850b..9f942bc906086 100644 --- a/util/kube/ctl.go +++ b/util/kube/ctl.go @@ -2,16 +2,20 @@ package kube import ( "bytes" + "context" "encoding/json" "fmt" "io/ioutil" "os/exec" "strings" + "sync" "github.com/pkg/errors" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -21,10 +25,74 @@ type Kubectl interface { ApplyResource(config *rest.Config, obj *unstructured.Unstructured, namespace string, dryRun, force bool) (string, error) ConvertToVersion(obj *unstructured.Unstructured, group, version string) (*unstructured.Unstructured, error) DeleteResource(config *rest.Config, obj *unstructured.Unstructured, namespace string) error + WatchResources(ctx context.Context, config *rest.Config, namespace string, selector func(kind schema.GroupVersionKind) metav1.ListOptions) (chan watch.Event, error) } type KubectlCmd struct{} +// WatchResources Watches all the existing resources with the provided label name in the provided namespace in the cluster provided by the config +func (k KubectlCmd) WatchResources( + ctx context.Context, config *rest.Config, namespace string, selector func(kind schema.GroupVersionKind) metav1.ListOptions) (chan watch.Event, error) { + + log.Infof("Start watching for resources changes with in cluster %s", config.Host) + dynClientPool := dynamic.NewDynamicClientPool(config) + disco, err := discovery.NewDiscoveryClientForConfig(config) + if err != nil { + return nil, err + } + serverResources, err := GetCachedServerResources(config.Host, disco) + if err != nil { + return nil, err + } + + items := make([]struct { + resource dynamic.ResourceInterface + gvk schema.GroupVersionKind + }, 0) + for _, apiResourcesList := range serverResources { + for i := range apiResourcesList.APIResources { + apiResource := apiResourcesList.APIResources[i] + watchSupported := false + for _, verb := range apiResource.Verbs { + if verb == watchVerb { + watchSupported = true + break + } + } + if watchSupported && !isExcludedResourceGroup(apiResource) { + dclient, err := dynClientPool.ClientForGroupVersionKind(schema.FromAPIVersionAndKind(apiResourcesList.GroupVersion, apiResource.Kind)) + if err != nil { + return nil, err + } + items = append(items, struct { + resource dynamic.ResourceInterface + gvk schema.GroupVersionKind + }{resource: dclient.Resource(&apiResource, namespace), gvk: schema.FromAPIVersionAndKind(apiResourcesList.GroupVersion, apiResource.Kind)}) + } + } + } + ch := make(chan watch.Event) + go func() { + var wg sync.WaitGroup + wg.Add(len(items)) + for i := 0; i < len(items); i++ { + item := items[i] + go func() { + defer wg.Done() + w, err := item.resource.Watch(selector(item.gvk)) + if err == nil { + defer w.Stop() + copyEventsChannel(ctx, w.ResultChan(), ch) + } + }() + } + wg.Wait() + close(ch) + log.Infof("Stop watching for resources changes with in cluster %s", config.ServerName) + }() + return ch, nil +} + // DeleteResource deletes resource func (k KubectlCmd) DeleteResource(config *rest.Config, obj *unstructured.Unstructured, namespace string) error { dynClientPool := dynamic.NewDynamicClientPool(config)