Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
informer.WithDeleteHandler[*v1alpha1.Application](a.addAppDeletionToQueue),
informer.WithFilters[*v1alpha1.Application](a.DefaultAppFilterChain()),
informer.WithNamespaceScope[*v1alpha1.Application](a.namespace),
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
}

appProjectManagerOption := []appproject.AppProjectManagerOption{
Expand Down Expand Up @@ -227,6 +228,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
informer.WithAddHandler[*v1alpha1.AppProject](a.addAppProjectCreationToQueue),
informer.WithUpdateHandler[*v1alpha1.AppProject](a.addAppProjectUpdateToQueue),
informer.WithDeleteHandler[*v1alpha1.AppProject](a.addAppProjectDeletionToQueue),
informer.WithGroupResource[*v1alpha1.AppProject]("argoproj.io", "appprojects"),
}

projInformer, err := informer.NewInformer(ctx, projInformerOptions...)
Expand Down Expand Up @@ -263,6 +265,7 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
informer.WithUpdateHandler[*corev1.Secret](a.handleRepositoryUpdate),
informer.WithDeleteHandler[*corev1.Secret](a.handleRepositoryDeletion),
informer.WithFilters(kuberepository.DefaultFilterChain(a.namespace)),
informer.WithGroupResource[*corev1.Secret]("", "secrets"),
}

repoInformer, err := informer.NewInformer(ctx, repoInformerOptions...)
Expand Down
1 change: 1 addition & 0 deletions internal/argocd/cluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func NewManager(ctx context.Context, namespace, redisAddress, redisPassword stri
informer.WithUpdateHandler(m.onClusterUpdated),
informer.WithDeleteHandler(m.onClusterDeleted),
informer.WithFilters(m.filters),
informer.WithGroupResource[*v1.Secret]("", "secrets"),
)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions internal/backend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
corev1 "k8s.io/api/core/v1"
)

type ContextKey string

const ForUpdateContextKey ContextKey = "forUpdate"

type ApplicationSelector struct {

// Labels is not currently implemented.
Expand Down
26 changes: 25 additions & 1 deletion internal/backend/kubernetes/application/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
appclientset "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
)

var _ backend.Application = &KubernetesBackend{}
Expand All @@ -43,18 +44,24 @@ type KubernetesBackend struct {
appClient appclientset.Interface
// appInformer is used to watch for change events for Argo CD Application resources on the cluster
appInformer informer.InformerInterface
// appLister is used to list Argo CD Application resources from the cache
appLister cache.GenericLister
// namespace is not currently read, is not guaranteed to be non-empty, and is not guaranteed to contain the source of Argo CD Application CRs in all cases
namespace string
usePatch bool
}

func NewKubernetesBackend(appClient appclientset.Interface, namespace string, appInformer informer.InformerInterface, usePatch bool) *KubernetesBackend {
return &KubernetesBackend{
be := &KubernetesBackend{
appClient: appClient,
appInformer: appInformer,
usePatch: usePatch,
namespace: namespace,
}
if specificInformer, ok := appInformer.(*informer.Informer[*v1alpha1.Application]); ok {
be.appLister = specificInformer.Lister()
}
return be
}

func (be *KubernetesBackend) List(ctx context.Context, selector backend.ApplicationSelector) ([]v1alpha1.Application, error) {
Expand Down Expand Up @@ -82,6 +89,23 @@ func (be *KubernetesBackend) Create(ctx context.Context, app *v1alpha1.Applicati
}

func (be *KubernetesBackend) Get(ctx context.Context, name string, namespace string) (*v1alpha1.Application, error) {
forUpdate, _ := ctx.Value(backend.ForUpdateContextKey).(bool)

if !forUpdate && be.appLister != nil && be.appInformer != nil && be.appInformer.HasSynced() {
Comment on lines +92 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain this construct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please take a look at my explanation below?

namespaceLister := be.appLister.ByNamespace(namespace)
if namespaceLister != nil {
obj, err := namespaceLister.Get(name)
if err != nil {
return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{})
}
app, ok := obj.(*v1alpha1.Application)
if !ok {
return nil, fmt.Errorf("object is not an Application: %T", obj)
}
return app.DeepCopy(), nil
}
}

return be.appClient.ArgoprojV1alpha1().Applications(namespace).Get(ctx, name, v1.GetOptions{})
}

Expand Down
114 changes: 110 additions & 4 deletions internal/backend/kubernetes/application/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
"testing"

"github.com/argoproj-labs/argocd-agent/internal/backend"
"github.com/argoproj-labs/argocd-agent/internal/informer"
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
fakeappclient "github.com/argoproj/argo-cd/v3/pkg/client/clientset/versioned/fake"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wI2L/jsondiff"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)

func Test_NewKubernetes(t *testing.T) {
Expand Down Expand Up @@ -110,22 +115,123 @@ func Test_Create(t *testing.T) {

func Test_Get(t *testing.T) {
apps := mkApps()
ctx := context.TODO()
t.Run("Get existing app", func(t *testing.T) {
fakeAppC := fakeappclient.NewSimpleClientset(apps...)
k := NewKubernetesBackend(fakeAppC, "", nil, true)
app, err := k.Get(context.TODO(), "app", "ns1")

inf, err := informer.NewInformer[*v1alpha1.Application](
ctx,
informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options)
}),
informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options)
}),
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
)
require.NoError(t, err)

go inf.Start(ctx)
require.NoError(t, inf.WaitForSync(ctx))

// Create the backend with the informer
backend := NewKubernetesBackend(fakeAppC, "", inf, true)

app, err := backend.Get(ctx, "app", "ns1")
assert.NoError(t, err)
assert.NotNil(t, app)
assert.Equal(t, "app", app.Name)
assert.Equal(t, "ns1", app.Namespace)

})
t.Run("Get non-existing app", func(t *testing.T) {
fakeAppC := fakeappclient.NewSimpleClientset(apps...)
k := NewKubernetesBackend(fakeAppC, "", nil, true)
app, err := k.Get(context.TODO(), "foo", "ns1")
inf, err := informer.NewInformer[*v1alpha1.Application](
ctx,
informer.WithListHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (runtime.Object, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").List(ctx, options)
}),
informer.WithWatchHandler[*v1alpha1.Application](func(ctx context.Context, options v1.ListOptions) (watch.Interface, error) {
return fakeAppC.ArgoprojV1alpha1().Applications("").Watch(ctx, options)
}),
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
)
require.NoError(t, err)
go inf.Start(ctx)
require.NoError(t, inf.WaitForSync(ctx))

backend := NewKubernetesBackend(fakeAppC, "", inf, true)

app, err := backend.Get(ctx, "nonexistent", "ns1")
assert.ErrorContains(t, err, "not found")
assert.Equal(t, &v1alpha1.Application{}, app)

})

t.Run("Get returns type assertion error for invalid object", func(t *testing.T) {
fakeAppC := fakeappclient.NewSimpleClientset()

mockInf := &mockInformerWithInvalidType{}

backend := &KubernetesBackend{
appClient: fakeAppC,
appInformer: mockInf,
appLister: mockInf.Lister(),
}

app, err := backend.Get(ctx, "test", "ns1")
require.Error(t, err)
require.Nil(t, app)
assert.Contains(t, err.Error(), "object is not an Application")
})
}

type mockInformerWithInvalidType struct{}

func (m *mockInformerWithInvalidType) Start(ctx context.Context) error {
return nil
}

func (m *mockInformerWithInvalidType) WaitForSync(ctx context.Context) error {
return nil
}

func (m *mockInformerWithInvalidType) HasSynced() bool {
return true
}

func (m *mockInformerWithInvalidType) Stop() error {
return nil
}

func (m *mockInformerWithInvalidType) Lister() cache.GenericLister {
return &mockListerWithInvalidType{}
}

type mockListerWithInvalidType struct{}

func (m *mockListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) {
return nil, nil
}

func (m *mockListerWithInvalidType) Get(name string) (runtime.Object, error) {
return &corev1.ConfigMap{}, nil
}

func (m *mockListerWithInvalidType) ByNamespace(namespace string) cache.GenericNamespaceLister {
return &mockNamespaceListerWithInvalidType{}
}

type mockNamespaceListerWithInvalidType struct{}

func (m *mockNamespaceListerWithInvalidType) List(selector labels.Selector) ([]runtime.Object, error) {
return nil, nil
}

func (m *mockNamespaceListerWithInvalidType) Get(name string) (runtime.Object, error) {
return &corev1.ConfigMap{}, nil
}

func Test_Delete(t *testing.T) {
apps := mkApps()
t.Run("Delete existing app", func(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions internal/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/sirupsen/logrus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -57,6 +58,9 @@ type Informer[T runtime.Object] struct {

resType reflect.Type

// groupResource is the group and resource of the watched objects.
groupResource schema.GroupResource

// logger is this informer's logger.
logger *logrus.Entry

Expand Down Expand Up @@ -109,6 +113,7 @@ func NewInformer[T runtime.Object](ctx context.Context, opts ...InformerOption[T
i := &Informer[T]{}
var r T
i.resType = reflect.TypeOf(r)

i.logger = logrus.NewEntry(logrus.StandardLogger()).WithFields(logrus.Fields{
"type": i.resType,
"module": "Informer",
Expand Down Expand Up @@ -287,3 +292,8 @@ func (i *Informer[T]) WaitForSync(ctx context.Context) error {
}
return nil
}

// Lister returns a GenericLister that can be used to list and get cached resources.
func (i *Informer[T]) Lister() cache.GenericLister {
return cache.NewGenericLister(i.informer.GetIndexer(), i.groupResource)
}
34 changes: 34 additions & 0 deletions internal/informer/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)
Expand Down Expand Up @@ -233,6 +234,39 @@ func Test_InformerScope(t *testing.T) {

}

func Test_Lister(t *testing.T) {
t.Run("Lister returns GenericLister", func(t *testing.T) {
i := newInformer(t, "", apps[0], apps[1])
go i.Start(context.TODO())
require.NoError(t, i.WaitForSync(context.TODO()))
defer i.Stop()

lister := i.Lister()
require.NotNil(t, lister)

obj, err := lister.ByNamespace("argocd").Get("test1")
require.NoError(t, err)
require.NotNil(t, obj)

app, ok := obj.(*v1alpha1.Application)
require.True(t, ok)
assert.Equal(t, "test1", app.Name)
assert.Equal(t, "argocd", app.Namespace)
})

t.Run("Lister can list objects", func(t *testing.T) {
i := newInformer(t, "", apps[0], apps[1])
go i.Start(context.TODO())
require.NoError(t, i.WaitForSync(context.TODO()))
defer i.Stop()

lister := i.Lister()
objs, err := lister.List(labels.Everything())
require.NoError(t, err)
assert.Len(t, objs, 2)
})
}

func init() {
logrus.SetLevel(logrus.TraceLevel)
}
12 changes: 12 additions & 0 deletions internal/informer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
)

Expand Down Expand Up @@ -105,3 +106,14 @@ func WithResyncPeriod[T runtime.Object](d time.Duration) InformerOption[T] {
return nil
}
}

// WithGroupResource sets the group and resource for the informer's lister.
func WithGroupResource[T runtime.Object](group, resource string) InformerOption[T] {
return func(i *Informer[T]) error {
i.groupResource = schema.GroupResource{
Group: group,
Resource: resource,
}
return nil
}
}
8 changes: 7 additions & 1 deletion internal/manager/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,14 @@ func (m *ApplicationManager) Delete(ctx context.Context, namespace string, incom
// be returned.
func (m *ApplicationManager) update(ctx context.Context, upsert bool, incoming *v1alpha1.Application, updateFn updateTransformer, patchFn patchTransformer) (*v1alpha1.Application, error) {
var updated *v1alpha1.Application

if ctx == nil {
ctx = context.Background()
}
ctxForUpdate := context.WithValue(ctx, backend.ForUpdateContextKey, true)

err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
existing, ierr := m.applicationBackend.Get(ctx, incoming.Name, incoming.Namespace)
existing, ierr := m.applicationBackend.Get(ctxForUpdate, incoming.Name, incoming.Namespace)
Copy link
Contributor Author

@juanxiu juanxiu Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jannfis

The reason why forUpdate is necessary lies here. Kubernetes implements Optimistic Concurrency Control through resourceVersion. When an Update request is made, the API server compares the resourceVersion in the request with the one currently stored, and if they differ, it returns a Conflict.

The issue arises when using the informer cache during updates. Although the informer is synchronized with the API server, there is a slight lag, meaning that the cache may return a stale resourceVersion. As a result, the Update request fails, and even when using retry.RetryOnConflict() to retry, it keeps reading the same stale resourceVersion from the cache, causing repeated failures.

In contrast, when forUpdate=true, the object is fetched directly from the API server, guaranteeing the latest resourceVersion. This ensures that during retries, the latest version is retrieved, allowing the Update to succeed.

Therefore, we use the informer cache for regular reads to optimize performance, and direct API reads during updates to ensure correctness.

if ierr != nil {
if errors.IsNotFound(ierr) && upsert {
updated, ierr = m.Create(ctx, incoming)
Expand Down
14 changes: 14 additions & 0 deletions internal/manager/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,21 @@ func fakeInformer(t *testing.T, namespace string, objects ...runtime.Object) (*f
return appC.ArgoprojV1alpha1().Applications(namespace).Watch(ctx, opts)
}),
informer.WithNamespaceScope[*v1alpha1.Application](namespace),
informer.WithGroupResource[*v1alpha1.Application]("argoproj.io", "applications"),
)
require.NoError(t, err)

go func() {
err = informer.Start(context.Background())
if err != nil {
t.Fatalf("failed to start informer: %v", err)
}
}()

if err = informer.WaitForSync(context.Background()); err != nil {
t.Fatalf("failed to wait for informer sync: %v", err)
}

return appC, informer
}

Expand Down Expand Up @@ -210,6 +223,7 @@ func Test_ManagerUpdateManaged(t *testing.T) {
require.NoError(t, err)

updated, err := mgr.UpdateManagedApp(context.Background(), incoming)

require.NoError(t, err)
require.NotNil(t, updated)

Expand Down
Loading
Loading