Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -148,12 +149,28 @@ func (ctrl *ApplicationController) watchClusterResources(ctx context.Context, it
}
}()
config := item.RESTConfig()
ch, err := kube.WatchResourcesWithLabel(ctx, config, "", common.LabelApplicationName)
watchStartTime := time.Now()
ch, err := ctrl.kubectl.WatchResources(ctx, config, "", func(gvk schema.GroupVersionKind) metav1.ListOptions {
ops := metav1.ListOptions{}
if !kube.IsCRDGroupVersionKind(gvk) {
ops.LabelSelector = common.LabelApplicationName
}
return ops
})

if err != nil {
return err
}
for event := range ch {
eventObj := event.Object.(*unstructured.Unstructured)
if kube.IsCRD(eventObj) {
// restart if new CRD has been created after watch started
if event.Type == watch.Added && watchStartTime.Before(eventObj.GetCreationTimestamp().Time) {
return fmt.Errorf("Restarting the watch because a new CRD was added.")
} else if event.Type == watch.Deleted {
return fmt.Errorf("Restarting the watch because a CRD was deleted.")
}
}
objLabels := eventObj.GetLabels()
if objLabels == nil {
objLabels = make(map[string]string)
Expand Down
11 changes: 11 additions & 0 deletions controller/sync_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package controller

import (
"context"
"fmt"
"sort"
"testing"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/util/kube"
log "github.com/sirupsen/logrus"
Expand All @@ -25,6 +29,13 @@ type kubectlOutput struct {

type mockKubectlCmd struct {
commands map[string]kubectlOutput
events chan watch.Event
}

func (k mockKubectlCmd) WatchResources(
ctx context.Context, config *rest.Config, namespace string, selector func(kind schema.GroupVersionKind) v1.ListOptions) (chan watch.Event, error) {

return k.events, nil
}

func (k mockKubectlCmd) DeleteResource(config *rest.Config, obj *unstructured.Unstructured, namespace string) error {
Expand Down
68 changes: 68 additions & 0 deletions util/kube/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package kube

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os/exec"
"strings"
"sync"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
Expand All @@ -21,10 +25,74 @@ type Kubectl interface {
ApplyResource(config *rest.Config, obj *unstructured.Unstructured, namespace string, dryRun, force bool) (string, error)
ConvertToVersion(obj *unstructured.Unstructured, group, version string) (*unstructured.Unstructured, error)
DeleteResource(config *rest.Config, obj *unstructured.Unstructured, namespace string) error
WatchResources(ctx context.Context, config *rest.Config, namespace string, selector func(kind schema.GroupVersionKind) metav1.ListOptions) (chan watch.Event, error)
}

type KubectlCmd struct{}

// WatchResources Watches all the existing resources with the provided label name in the provided namespace in the cluster provided by the config
func (k KubectlCmd) WatchResources(
ctx context.Context, config *rest.Config, namespace string, selector func(kind schema.GroupVersionKind) metav1.ListOptions) (chan watch.Event, error) {

log.Infof("Start watching for resources changes with in cluster %s", config.Host)
dynClientPool := dynamic.NewDynamicClientPool(config)
disco, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
serverResources, err := GetCachedServerResources(config.Host, disco)
if err != nil {
return nil, err
}

items := make([]struct {
resource dynamic.ResourceInterface
gvk schema.GroupVersionKind
}, 0)
for _, apiResourcesList := range serverResources {
for i := range apiResourcesList.APIResources {
apiResource := apiResourcesList.APIResources[i]
watchSupported := false
for _, verb := range apiResource.Verbs {
if verb == watchVerb {
watchSupported = true
break
}
}
if watchSupported && !isExcludedResourceGroup(apiResource) {
dclient, err := dynClientPool.ClientForGroupVersionKind(schema.FromAPIVersionAndKind(apiResourcesList.GroupVersion, apiResource.Kind))
if err != nil {
return nil, err
}
items = append(items, struct {
resource dynamic.ResourceInterface
gvk schema.GroupVersionKind
}{resource: dclient.Resource(&apiResource, namespace), gvk: schema.FromAPIVersionAndKind(apiResourcesList.GroupVersion, apiResource.Kind)})
}
}
}
ch := make(chan watch.Event)
go func() {
var wg sync.WaitGroup
wg.Add(len(items))
for i := 0; i < len(items); i++ {
item := items[i]
go func() {
defer wg.Done()
w, err := item.resource.Watch(selector(item.gvk))
if err == nil {
defer w.Stop()
copyEventsChannel(ctx, w.ResultChan(), ch)
}
}()
}
wg.Wait()
close(ch)
log.Infof("Stop watching for resources changes with in cluster %s", config.ServerName)
}()
return ch, nil
}

// DeleteResource deletes resource
func (k KubectlCmd) DeleteResource(config *rest.Config, obj *unstructured.Unstructured, namespace string) error {
dynClientPool := dynamic.NewDynamicClientPool(config)
Expand Down