Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e277c34
self-service from event
mayzhang2000 May 4, 2023
d07364d
chore: replace github.com/ghodss/yaml with sigs.k8s.io/yaml (#175)
Juneezee May 5, 2023
905c884
feat: Adding new PagerDuty integration based on Events API v2 (#105)
EricTendian May 8, 2023
560c56b
self-service from event
mayzhang2000 May 9, 2023
dc25793
self-service from event
mayzhang2000 May 9, 2023
714c7c3
self-service from event
mayzhang2000 May 4, 2023
c78fc69
self-service from event
mayzhang2000 May 9, 2023
8c38002
self-service from event
mayzhang2000 May 9, 2023
518f9d0
Merge remote-tracking branch 'origin/self-service' into self-service
mayzhang2000 May 9, 2023
ad9af61
clean up factory.go
mayzhang2000 May 10, 2023
3669f0a
error handleing in factory.go
mayzhang2000 May 10, 2023
7a729d6
error handleing in factory.go
mayzhang2000 May 11, 2023
c0d5bab
error handleing in factory.go
mayzhang2000 May 15, 2023
b535cad
error handleing in factory.go
mayzhang2000 May 15, 2023
f7be52d
error handleing in factory.go
mayzhang2000 May 15, 2023
8e233b8
error handleing in factory.go
mayzhang2000 May 16, 2023
6df2afd
error handleing in factory.go
mayzhang2000 May 17, 2023
5625a21
better caching
mayzhang2000 May 18, 2023
152986e
better caching
mayzhang2000 May 19, 2023
626964f
Merge branch 'master' of https://github.com/argoproj/notifications-en…
zachaller Jul 5, 2023
a6d34f8
refactor
zachaller Jul 5, 2023
d55c301
unused
zachaller Jul 5, 2023
2a125ca
lint
zachaller Jul 5, 2023
d74aa14
bump ci
zachaller Jul 5, 2023
5f03d01
small fixes
zachaller Jul 6, 2023
4b1a742
rename
zachaller Jul 6, 2023
e43f9de
rename
zachaller Jul 6, 2023
0bfbd0f
rename
zachaller Jul 6, 2023
0014c33
change log line
zachaller Jul 6, 2023
3b951a8
update tests
zachaller Jul 6, 2023
d882bfd
Merge branch 'master' of https://github.com/argoproj/notifications-en…
zachaller Jul 6, 2023
f0a3872
continue on error
zachaller Jul 7, 2023
dcfceac
continue on errors
zachaller Jul 7, 2023
3a6ced8
add godoc
zachaller Jul 7, 2023
40120ab
continue on error
zachaller Jul 7, 2023
806df6c
fix error by going back to non namespace support
zachaller Jul 7, 2023
d36e32b
fix log
zachaller Jul 7, 2023
2264742
improve log on error
zachaller Jul 10, 2023
ba8bd6e
Merge branch 'master' of https://github.com/argoproj/notifications-en…
zachaller Jul 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Run golangci-lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.45.2
version: v1.49.0
args: --timeout 5m
test:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
k8s.io/api v0.23.3
k8s.io/apimachinery v0.23.3
k8s.io/client-go v0.23.3
k8s.io/utils v0.0.0-20211116205334-6203023598ed
sigs.k8s.io/yaml v1.3.0
)

Expand Down Expand Up @@ -84,7 +85,6 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
DefaultTriggers []string
// ServiceDefaultTriggers holds list of default triggers per service
ServiceDefaultTriggers map[string][]string
Namespace string
}

// Returns list of destinations for the specified trigger
Expand Down Expand Up @@ -76,6 +77,7 @@ func ParseConfig(configMap *v1.ConfigMap, secret *v1.Secret) (*Config, error) {
Triggers: map[string][]triggers.Condition{},
ServiceDefaultTriggers: map[string][]string{},
Templates: map[string]services.Notification{},
Namespace: configMap.Namespace,
}
if subscriptionYaml, ok := configMap.Data["subscriptions"]; ok {
if err := yaml.Unmarshal([]byte(subscriptionYaml), &cfg.Subscriptions); err != nil {
Expand Down
132 changes: 100 additions & 32 deletions pkg/api/factory.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package api

import (
"fmt"
"sync"

log "github.com/sirupsen/logrus"

"k8s.io/utils/strings/slices"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -18,27 +23,38 @@ type Settings struct {
SecretName string
// InitGetVars returns a function that produces notifications context variables
InitGetVars func(cfg *Config, configMap *v1.ConfigMap, secret *v1.Secret) (GetVars, error)
// DefaultNamespace default namespace for ConfigMap and Secret.
// For self-service notification, we get notification configurations from rollout resource namespace
// and also the default namespace
DefaultNamespace string
}

// Factory creates an API instance
type Factory interface {
GetAPI() (API, error)
GetAPIsFromNamespace(namespace string) (map[string]API, error)
}

type apiFactory struct {
Settings

cmLister v1listers.ConfigMapNamespaceLister
secretLister v1listers.SecretNamespaceLister
cmLister v1listers.ConfigMapLister
secretLister v1listers.SecretLister
lock sync.Mutex
api API
apiMap map[string]API
}

func NewFactory(settings Settings, namespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory {
// NewFactory creates a new API factory if namespace is not empty, it will override the default namespace set in settings
func NewFactory(settings Settings, defaultNamespace string, secretsInformer cache.SharedIndexInformer, cmInformer cache.SharedIndexInformer) *apiFactory {
if defaultNamespace != "" {
settings.DefaultNamespace = defaultNamespace
}

factory := &apiFactory{
Settings: settings,
cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()).ConfigMaps(namespace),
secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()).Secrets(namespace),
cmLister: v1listers.NewConfigMapLister(cmInformer.GetIndexer()),
secretLister: v1listers.NewSecretLister(secretsInformer.GetIndexer()),
apiMap: make(map[string]API),
}

secretsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -70,12 +86,15 @@ func (f *apiFactory) invalidateIfHasName(name string, obj interface{}) {
return
}
if metaObj.GetName() == name {
f.invalidateCache()
f.lock.Lock()
defer f.lock.Unlock()
f.apiMap[metaObj.GetNamespace()] = nil
log.Info("invalidated cache for resource in namespace: ", metaObj.GetNamespace(), " with the name: ", metaObj.GetName())
}
}

func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error) {
cm, err := f.cmLister.Get(f.ConfigMapName)
func (f *apiFactory) getConfigMapAndSecretWithListers(cmLister v1listers.ConfigMapNamespaceLister, secretLister v1listers.SecretNamespaceLister) (*v1.ConfigMap, *v1.Secret, error) {
cm, err := cmLister.Get(f.ConfigMapName)
if err != nil {
if errors.IsNotFound(err) {
cm = &v1.ConfigMap{}
Expand All @@ -84,7 +103,7 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error)
}
}

secret, err := f.secretLister.Get(f.SecretName)
secret, err := secretLister.Get(f.SecretName)
if err != nil {
if errors.IsNotFound(err) {
secret = &v1.Secret{}
Expand All @@ -93,36 +112,85 @@ func (f *apiFactory) getConfigMapAndSecret() (*v1.ConfigMap, *v1.Secret, error)
}
}

if errors.IsNotFound(err) {
return cm, secret, nil
}
return cm, secret, err
}

func (f *apiFactory) invalidateCache() {
f.lock.Lock()
defer f.lock.Unlock()
f.api = nil
func (f *apiFactory) getConfigMapAndSecret(namespace string) (*v1.ConfigMap, *v1.Secret, error) {
cmLister := f.cmLister.ConfigMaps(namespace)
secretLister := f.secretLister.Secrets(namespace)

return f.getConfigMapAndSecretWithListers(cmLister, secretLister)
}

func (f *apiFactory) GetAPI() (API, error) {
apis, err := f.GetAPIsFromNamespace(f.Settings.DefaultNamespace)
if err != nil {
return nil, err
}
return apis[f.Settings.DefaultNamespace], nil
}

// GetAPIsFromNamespace returns a map of API instances for a given namespace, if there is an error in populating the API for a namespace, it will be skipped
// and the error will be logged and returned. The caller is responsible for handling the error. The API map will also be returned with any successfully constructed
// API instances.
func (f *apiFactory) GetAPIsFromNamespace(namespace string) (map[string]API, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.api == nil {
cm, secret, err := f.getConfigMapAndSecret()
if err != nil {
return nil, err
}
cfg, err := ParseConfig(cm, secret)
if err != nil {
return nil, err
}
getVars, err := f.InitGetVars(cfg, cm, secret)
if err != nil {
return nil, err
}
api, err := NewAPI(*cfg, getVars)
if err != nil {
return nil, err

apis := make(map[string]API)

// namespaces to look for notification configurations
namespaces := []string{namespace}
if !slices.Contains(namespaces, f.Settings.DefaultNamespace) {
namespaces = append(namespaces, f.Settings.DefaultNamespace)
}

errors := []error{}
for _, namespace := range namespaces {
if f.apiMap[namespace] == nil {
api, err := f.getApiFromNamespace(namespace)
if err != nil {
log.Error("error getting api from namespace: ", namespace, " error: ", err)
errors = append(errors, err)
continue
}
f.apiMap[namespace] = api
apis[namespace] = f.apiMap[namespace]
} else {
apis[namespace] = f.apiMap[namespace]
}
f.api = api
}
return f.api, nil

if len(errors) > 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder to myself: need to discuss this with you.

return apis, fmt.Errorf("errors getting apis: %s", errors)
}
return apis, nil
}

func (f *apiFactory) getApiFromNamespace(namespace string) (API, error) {
cm, secret, err := f.getConfigMapAndSecret(namespace)
if err != nil {
return nil, err
}
return f.getApiFromConfigmapAndSecret(cm, secret)

}

func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.Secret) (API, error) {
cfg, err := ParseConfig(cm, secret)
if err != nil {
return nil, err
}
getVars, err := f.InitGetVars(cfg, cm, secret)
if err != nil {
return nil, err
}
api, err := NewAPI(*cfg, getVars)
if err != nil {
return nil, err
}
return api, nil
}
68 changes: 50 additions & 18 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,18 @@ func NewController(
return ctrl
}

// NewControllerWithNamespaceSupport For self-service notification
func NewControllerWithNamespaceSupport(
client dynamic.NamespaceableResourceInterface,
informer cache.SharedIndexInformer,
apiFactory api.Factory,
opts ...Opts,
) *notificationController {
ctrl := NewController(client, informer, apiFactory, opts...)
ctrl.namespaceSupport = true
return ctrl
}

type notificationController struct {
client dynamic.NamespaceableResourceInterface
informer cache.SharedIndexInformer
Expand All @@ -152,6 +164,7 @@ type notificationController struct {
alterDestinations func(obj v1.Object, destinations services.Destinations, cfg api.Config) services.Destinations
toUnstructured func(obj v1.Object) (*unstructured.Unstructured, error)
eventCallback func(eventSequence NotificationEventSequence)
namespaceSupport bool
}

func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
Expand All @@ -169,12 +182,9 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
log.Warn("Controller has stopped.")
}

func (c *notificationController) processResource(resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
apiNamespace := api.GetConfig().Namespace
notificationsState := NewStateFromRes(resource)
api, err := c.apiFactory.GetAPI()
if err != nil {
return nil, err
}

destinations := c.getDestinations(resource, api.GetConfig())
if len(destinations) == 0 {
Expand All @@ -189,8 +199,8 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l
for trigger, destinations := range destinations {
res, err := api.RunTrigger(trigger, un.Object)
if err != nil {
logEntry.Debugf("Failed to execute condition of trigger %s: %v", trigger, err)
eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v", trigger, err))
logEntry.Debugf("Failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace)
eventSequence.addWarning(fmt.Errorf("failed to execute condition of trigger %s: %v using the configuration in namespace %s", trigger, err, apiNamespace))
}
logEntry.Infof("Trigger %s result: %v", trigger, res)

Expand All @@ -206,22 +216,22 @@ func (c *notificationController) processResource(resource v1.Object, logEntry *l

for _, to := range destinations {
if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed {
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v'", trigger, cr.Key, to)
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
eventSequence.addDelivered(NotificationDelivery{
Trigger: trigger,
Destination: to,
AlreadyNotified: true,
})
} else {
logEntry.Infof("Sending notification about condition '%s.%s' to '%v'", trigger, cr.Key, to)
logEntry.Infof("Sending notification about condition '%s.%s' to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
if err := api.Send(un.Object, cr.Templates, to); err != nil {
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v",
to, resource.GetNamespace(), resource.GetName(), err)
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s",
to, resource.GetNamespace(), resource.GetName(), err, apiNamespace)
notificationsState.SetAlreadyNotified(trigger, cr, to, false)
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false)
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v", trigger, to, err))
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace))
} else {
logEntry.Debugf("Notification %s was sent", to.Recipient)
logEntry.Debugf("Notification %s was sent using the configuration in namespace %s", to.Recipient, apiNamespace)
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, true)
eventSequence.addDelivered(NotificationDelivery{
Trigger: trigger,
Expand Down Expand Up @@ -294,7 +304,31 @@ func (c *notificationController) processQueueItem() (processNext bool) {
}
}

annotations, err := c.processResource(resource, logEntry, &eventSequence)
if !c.namespaceSupport {
api, err := c.apiFactory.GetAPI()
if err != nil {
logEntry.Errorf("Failed to get api: %v", err)
eventSequence.addError(err)
return
}
c.processResource(api, resource, logEntry, &eventSequence)
} else {
apisWithNamespace, err := c.apiFactory.GetAPIsFromNamespace(resource.GetNamespace())
if err != nil {
logEntry.Errorf("Failed to get api with namespace: %v", err)
eventSequence.addError(err)
}
for _, api := range apisWithNamespace {
c.processResource(api, resource, logEntry, &eventSequence)
}
}
logEntry.Info("Processing completed")

return
}

func (c *notificationController) processResource(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) {
annotations, err := c.processResourceWithAPI(api, resource, logEntry, eventSequence)
if err != nil {
logEntry.Errorf("Failed to process: %v", err)
eventSequence.addError(err)
Expand All @@ -307,7 +341,7 @@ func (c *notificationController) processQueueItem() (processNext bool) {
annotationsPatch[k] = v
}
for k := range resource.GetAnnotations() {
if _, ok = annotations[k]; !ok {
if _, ok := annotations[k]; !ok {
annotationsPatch[k] = nil
}
}
Expand All @@ -329,11 +363,9 @@ func (c *notificationController) processQueueItem() (processNext bool) {
if err := c.informer.GetStore().Update(resource); err != nil {
logEntry.Warnf("Failed to store update resource in informer: %v", err)
eventSequence.addWarning(fmt.Errorf("failed to store update resource in informer: %v", err))
return
}
}
logEntry.Info("Processing completed")

return
}

func mapsEqual(first, second map[string]string) bool {
Expand Down
Loading