Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/runtime/cache/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type AccountCache struct {
log logr.Logger
roleARNs map[string]string
configMapCreated bool
hasSynced func() bool
}

// NewAccountCache instanciate a new AccountCache.
Expand Down Expand Up @@ -111,6 +112,7 @@ func (c *AccountCache) Run(clientSet kubernetes.Interface, stopCh <-chan struct{
},
})
go informer.Run(stopCh)
c.hasSynced = informer.HasSynced
}

// GetAccountRoleARN queries the AWS accountID associated Role ARN
Expand Down
11 changes: 10 additions & 1 deletion pkg/runtime/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package cache

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/jaypipes/envutil"
kubernetes "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

const (
Expand Down Expand Up @@ -96,7 +98,14 @@ func (c Caches) Run(clientSet kubernetes.Interface) {
if c.Namespaces != nil {
c.Namespaces.Run(clientSet, stopCh)
}
c.stopCh = stopCh
}

// WaitForCachesToSync waits for both of the namespace and configMap
// informers to sync - by checking their hasSynced functions.
func (c Caches) WaitForCachesToSync(ctx context.Context) bool {
namespaceSynced := cache.WaitForCacheSync(ctx.Done(), c.Namespaces.hasSynced)
accountSynced := cache.WaitForCacheSync(ctx.Done(), c.Accounts.hasSynced)
return namespaceSynced && accountSynced
}

// Stop closes the stop channel and cause all the SharedInformers
Expand Down
4 changes: 4 additions & 0 deletions pkg/runtime/cache/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type NamespaceCache struct {
watchScope []string
// ignored is the list of namespaces we are ignoring
ignored []string
// hasSynced is a function that will return true if namespace informer
// has received "at least" once the full list of the namespaces.
hasSynced func() bool
}

// NewNamespaceCache instanciate a new NamespaceCache.
Expand Down Expand Up @@ -160,6 +163,7 @@ func (c *NamespaceCache) Run(clientSet kubernetes.Interface, stopCh <-chan struc
},
})
go informer.Run(stopCh)
c.hasSynced = informer.HasSynced
}

// GetDefaultRegion returns the default region if it it exists
Expand Down
5 changes: 5 additions & 0 deletions pkg/runtime/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package runtime

import (
"context"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -233,6 +234,10 @@ func (c *serviceController) BindControllerManager(mgr ctrlrt.Manager, cfg ackcfg
// Run the caches. This will not block as the caches are run in
// separate goroutines.
cache.Run(clientSet)
// Wait for the caches to sync
ctx := context.TODO()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do 1 minute for this instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think indeed this will solve the failing unit tests, but imposes risk of race condition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can configure that specific failing unit test to not use CARM caches? it is only supposed to tests that the controller build happen correctly

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's happening here is that: the unit tests attempt to initialize a cache, but WaitForCachesToSync requires communication with the api server, which is not running during unit tests. so... the function waits indefinitely an eventually leads to timeouts..

synced := cache.WaitForCachesToSync(ctx)
c.log.Info("Waited for the caches to sync", "synced", synced)
}

if cfg.EnableAdoptedResourceReconciler {
Expand Down
5 changes: 4 additions & 1 deletion pkg/runtime/service_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ func TestServiceController(t *testing.T) {
require.Empty(recons)

mgr := &fakeManager{}
cfg := ackcfg.Config{}
cfg := ackcfg.Config{
// Disable caches, by setting a mono-namespace watch mode
WatchNamespace: "default",
}
err := sc.BindControllerManager(mgr, cfg)
require.Nil(err)

Expand Down