diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index bcab39f1..ba134fdc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ jobs: - name: Run golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.45.2 + version: v1.49.0 args: --timeout 5m test: runs-on: ubuntu-latest diff --git a/go.mod b/go.mod index 15709722..9cdbf203 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( k8s.io/api v0.23.3 k8s.io/apimachinery v0.23.3 k8s.io/client-go v0.23.3 + k8s.io/utils v0.0.0-20211116205334-6203023598ed sigs.k8s.io/yaml v1.3.0 ) @@ -84,7 +85,6 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect - k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect ) diff --git a/pkg/api/config.go b/pkg/api/config.go index eefc41db..cf1d0e0d 100644 --- a/pkg/api/config.go +++ b/pkg/api/config.go @@ -29,6 +29,7 @@ type Config struct { DefaultTriggers []string // ServiceDefaultTriggers holds list of default triggers per service ServiceDefaultTriggers map[string][]string + Namespace string } // Returns list of destinations for the specified trigger @@ -76,6 +77,7 @@ func ParseConfig(configMap *v1.ConfigMap, secret *v1.Secret) (*Config, error) { Triggers: map[string][]triggers.Condition{}, ServiceDefaultTriggers: map[string][]string{}, Templates: map[string]services.Notification{}, + Namespace: configMap.Namespace, } if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok { if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil { diff --git a/pkg/api/factory.go b/pkg/api/factory.go index 9fb09853..e280b192 100644 --- a/pkg/api/factory.go +++ b/pkg/api/factory.go @@ -1,8 +1,13 @@ package api import ( + "fmt" "sync" + log "github.com/sirupsen/logrus" + + "k8s.io/utils/strings/slices" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -18,27 +23,38 @@ type Settings struct { SecretName string // InitGetVars returns a function that produces notifications context variables InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error) + // DefaultNamespace default namespace for ConfigMap and Secret. + // For self-service notification, we get notification configurations from rollout resource namespace + // and also the default namespace + DefaultNamespace string } // Factory creates an API instance type Factory interface { GetAPI() (API, error) + GetAPIsFromNamespace(namespace string) (map[string]API, error) } type apiFactory struct { Settings - cmLister v1listers.ConfigMapNamespaceLister - secretLister v1listers.SecretNamespaceLister + cmLister v1listers.ConfigMapLister + secretLister v1listers.SecretLister lock sync.Mutex - api API + apiMap map[string]API } -func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { +// NewFactory creates a new API factory if namespace is not empty, it will override the default namespace set in settings +func NewFactory(settings Settings, defaultNamespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory { + if defaultNamespace != "" { + settings.DefaultNamespace = defaultNamespace + } + factory := &apiFactory{ Settings: settings, - cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace), - secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace), + cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()), + secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()), + apiMap: make(map[string]API), } secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -70,12 +86,15 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) { return } if metaObj.GetName() == name { - f.invalidateCache() + f.lock.Lock() + defer f.lock.Unlock() + f.apiMap[metaObj.GetNamespace()] = nil + log.Info("invalidated cache for resource in namespace: ", metaObj.GetNamespace(), " with the name: ", metaObj.GetName()) } } -func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) { - cm, err := f.cmLister.Get(f.ConfigMapName) +func (f *apiFactory) getConfigMapAndSecretWithListers(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) { + cm, err := cmLister.Get(f.ConfigMapName) if err != nil { if errors.IsNotFound(err) { cm = &v1.ConfigMap{} @@ -84,7 +103,7 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) } } - secret, err := f.secretLister.Get(f.SecretName) + secret, err := secretLister.Get(f.SecretName) if err != nil { if errors.IsNotFound(err) { secret = &v1.Secret{} @@ -93,36 +112,85 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) } } + if errors.IsNotFound(err) { + return cm, secret, nil + } return cm, secret, err } -func (f *apiFactory) invalidateCache() { - f.lock.Lock() - defer f.lock.Unlock() - f.api = nil +func (f *apiFactory) getConfigMapAndSecret(namespace string) (*v1.ConfigMap, *v1.Secret, error) { + cmLister := f.cmLister.ConfigMaps(namespace) + secretLister := f.secretLister.Secrets(namespace) + + return f.getConfigMapAndSecretWithListers(cmLister, secretLister) } func (f *apiFactory) GetAPI() (API, error) { + apis, err := f.GetAPIsFromNamespace(f.Settings.DefaultNamespace) + if err != nil { + return nil, err + } + return apis[f.Settings.DefaultNamespace], nil +} + +// GetAPIsFromNamespace returns a map of API instances for a given namespace, if there is an error in populating the API for a namespace, it will be skipped +// and the error will be logged and returned. The caller is responsible for handling the error. The API map will also be returned with any successfully constructed +// API instances. +func (f *apiFactory) GetAPIsFromNamespace(namespace string) (map[string]API, error) { f.lock.Lock() defer f.lock.Unlock() - if f.api == nil { - cm, secret, err := f.getConfigMapAndSecret() - if err != nil { - return nil, err - } - cfg, err := ParseConfig(cm, secret) - if err != nil { - return nil, err - } - getVars, err := f.InitGetVars(cfg, cm, secret) - if err != nil { - return nil, err - } - api, err := NewAPI(*cfg, getVars) - if err != nil { - return nil, err + + apis := make(map[string]API) + + // namespaces to look for notification configurations + namespaces := []string{namespace} + if !slices.Contains(namespaces, f.Settings.DefaultNamespace) { + namespaces = append(namespaces, f.Settings.DefaultNamespace) + } + + errors := []error{} + for _, namespace := range namespaces { + if f.apiMap[namespace] == nil { + api, err := f.getApiFromNamespace(namespace) + if err != nil { + log.Error("error getting api from namespace: ", namespace, " error: ", err) + errors = append(errors, err) + continue + } + f.apiMap[namespace] = api + apis[namespace] = f.apiMap[namespace] + } else { + apis[namespace] = f.apiMap[namespace] } - f.api = api } - return f.api, nil + + if len(errors) > 0 { + return apis, fmt.Errorf("errors getting apis: %s", errors) + } + return apis, nil +} + +func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) { + cm, secret, err := f.getConfigMapAndSecret(namespace) + if err != nil { + return nil, err + } + return f.getApiFromConfigmapAndSecret(cm, secret) + +} + +func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.Secret) (API, error) { + cfg, err := ParseConfig(cm, secret) + if err != nil { + return nil, err + } + getVars, err := f.InitGetVars(cfg, cm, secret) + if err != nil { + return nil, err + } + api, err := NewAPI(*cfg, getVars) + if err != nil { + return nil, err + } + return api, nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d4441302..a492526f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -142,6 +142,18 @@ func NewController( return ctrl } +// NewControllerWithNamespaceSupport For self-service notification +func NewControllerWithNamespaceSupport( + client dynamic.NamespaceableResourceInterface, + informer cache.SharedIndexInformer, + apiFactory api.Factory, + opts ...Opts, +) *notificationController { + ctrl := NewController(client, informer, apiFactory, opts...) + ctrl.namespaceSupport = true + return ctrl +} + type notificationController struct { client dynamic.NamespaceableResourceInterface informer cache.SharedIndexInformer @@ -152,6 +164,7 @@ type notificationController struct { alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error) eventCallback func(eventSequence NotificationEventSequence) + namespaceSupport bool } func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { @@ -169,12 +182,9 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) { log.Warn("Controller has stopped.") } -func (c *notificationController) processResource(resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { +func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) { + apiNamespace := api.GetConfig().Namespace notificationsState := NewStateFromRes(resource) - api, err := c.apiFactory.GetAPI() - if err != nil { - return nil, err - } destinations := c.getDestinations(resource, api.GetConfig()) if len(destinations) == 0 { @@ -189,8 +199,8 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l for trigger, destinations := range destinations { res, err := api.RunTrigger(trigger, un.Object) if err != nil { - logEntry.Debugf("Failed to execute condition of trigger %s: %v", trigger, err) - eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v", trigger, err)) + logEntry.Debugf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace) + eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace)) } logEntry.Infof("Trigger %s result: %v", trigger, res) @@ -206,22 +216,22 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l for _, to := range destinations { if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed { - logEntry.Infof("Notification about condition '%s.%s' already sent to '%v'", trigger, cr.Key, to) + logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) eventSequence.addDelivered(NotificationDelivery{ Trigger: trigger, Destination: to, AlreadyNotified: true, }) } else { - logEntry.Infof("Sending notification about condition '%s.%s' to '%v'", trigger, cr.Key, to) + logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace) if err := api.Send(un.Object, cr.Templates, to); err != nil { - logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v", - to, resource.GetNamespace(), resource.GetName(), err) + logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s", + to, resource.GetNamespace(), resource.GetName(), err, apiNamespace) notificationsState.SetAlreadyNotified(trigger, cr, to, false) c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false) - eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v", trigger, to, err)) + eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace)) } else { - logEntry.Debugf("Notification %s was sent", to.Recipient) + logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace) c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true) eventSequence.addDelivered(NotificationDelivery{ Trigger: trigger, @@ -294,7 +304,31 @@ func (c *notificationController) processQueueItem() (processNext bool) { } } - annotations, err := c.processResource(resource, logEntry, &eventSequence) + if !c.namespaceSupport { + api, err := c.apiFactory.GetAPI() + if err != nil { + logEntry.Errorf("Failed to get api: %v", err) + eventSequence.addError(err) + return + } + c.processResource(api, resource, logEntry, &eventSequence) + } else { + apisWithNamespace, err := c.apiFactory.GetAPIsFromNamespace(resource.GetNamespace()) + if err != nil { + logEntry.Errorf("Failed to get api with namespace: %v", err) + eventSequence.addError(err) + } + for _, api := range apisWithNamespace { + c.processResource(api, resource, logEntry, &eventSequence) + } + } + logEntry.Info("Processing completed") + + return +} + +func (c *notificationController) processResource(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) { + annotations, err := c.processResourceWithAPI(api, resource, logEntry, eventSequence) if err != nil { logEntry.Errorf("Failed to process: %v", err) eventSequence.addError(err) @@ -307,7 +341,7 @@ func (c *notificationController) processQueueItem() (processNext bool) { annotationsPatch[k] = v } for k := range resource.GetAnnotations() { - if _, ok = annotations[k]; !ok { + if _, ok := annotations[k]; !ok { annotationsPatch[k] = nil } } @@ -329,11 +363,9 @@ func (c *notificationController) processQueueItem() (processNext bool) { if err := c.informer.GetStore().Update(resource); err != nil { logEntry.Warnf("Failed to store update resource in informer: %v", err) eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err)) + return } } - logEntry.Info("Processing completed") - - return } func mapsEqual(first, second map[string]string) bool { diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2128e15d..2f76a1c8 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -90,7 +90,7 @@ func newController(t *testing.T, ctx context.Context, client dynamic.Interface, go informer.Run(ctx.Done()) - c := NewController(resourceClient, informer, &mocks.FakeFactory{Api: mockAPI}, opts...) + c := NewControllerWithNamespaceSupport(resourceClient, informer, &mocks.FakeFactory{Api: mockAPI}, opts...) if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { return nil, nil, errors.New("failed to sync informers") } @@ -115,7 +115,7 @@ func TestSendsNotificationIfTriggered(t *testing.T) { return true }), []string{"test"}, services.Destination{Service: "mock", Recipient: "recipient"}).Return(nil) - annotations, err := ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -141,7 +141,7 @@ func TestDoesNotSendNotificationIfAnnotationPresent(t *testing.T) { api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil) - _, err = ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + _, err = ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -163,7 +163,7 @@ func TestRemovesAnnotationIfNoTrigger(t *testing.T) { api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil) - annotations, err := ctrl.processResource(app, logEntry, &NotificationEventSequence{}) + annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{}) if err != nil { logEntry.Errorf("Failed to process: %v", err) } @@ -298,7 +298,7 @@ func TestWithEventCallback(t *testing.T) { description: "EventCallback should be invoked with non-nil error on send failure", sendErr: errors.New("this is a send error"), expectedErrors: []error{ - errors.New("failed to deliver notification my-trigger to {mock recipient}: this is a send error"), + errors.New("failed to deliver notification my-trigger to {mock recipient}: this is a send error using the configuration in namespace "), }, }, { @@ -323,6 +323,7 @@ func TestWithEventCallback(t *testing.T) { })) ctrl, api, err := newController(t, ctx, newFakeClient(app), WithEventCallback(mockEventCallback)) + ctrl.namespaceSupport = false assert.NoError(t, err) ctrl.apiFactory = &mocks.FakeFactory{Api: api, Err: tc.apiErr} diff --git a/pkg/mocks/factory.go b/pkg/mocks/factory.go index 136955b1..7ebbb5a1 100644 --- a/pkg/mocks/factory.go +++ b/pkg/mocks/factory.go @@ -10,3 +10,9 @@ type FakeFactory struct { func (f *FakeFactory) GetAPI() (api.API, error) { return f.Api, f.Err } + +func (f *FakeFactory) GetAPIsFromNamespace(namespace string) (map[string]api.API, error) { + apiMap := make(map[string]api.API) + apiMap[namespace] = f.Api + return apiMap, f.Err +} diff --git a/pkg/services/pagerduty.go b/pkg/services/pagerduty.go index d08b8d6c..a151bced 100644 --- a/pkg/services/pagerduty.go +++ b/pkg/services/pagerduty.go @@ -93,7 +93,7 @@ func (p pagerdutyService) Send(notification Notification, dest Destination) erro Priority: &pagerduty.APIReference{ID: priorityID, Type: "priority"}, Title: title, Urgency: urgency, - Body: &pagerduty.APIDetails{Type: "incident_details ", Details: body}, + Body: &pagerduty.APIDetails{Type: "incident_details ", Details: body}, } incident, err := pagerDutyClient.CreateIncidentWithContext(context.TODO(), p.opts.From, input) if err != nil {