Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite events handling to remove watermill #130

Merged
merged 6 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions entx/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var EventsHookAnnotationName = "INFRA9_EVENTHOOKS"

// EventsHookAnnotation provides a ent.Annotation spec. These shouldn't be set directly, you should use EventsHookAdditionalSubject() and EventsHookSubjectName instead
type EventsHookAnnotation struct {
SubjectName string
IsAdditionalSubjectField bool
SubjectName string
AdditionalSubjectRelation string
}

// Name implements the ent Annotation interface.
Expand All @@ -29,9 +29,9 @@ func (a EventsHookAnnotation) Name() string {
}

// EventsHookAdditionalSubject marks this field as a field to return as an additional subject
func EventsHookAdditionalSubject() *EventsHookAnnotation {
func EventsHookAdditionalSubject(relation string) *EventsHookAnnotation {
return &EventsHookAnnotation{
IsAdditionalSubjectField: true,
AdditionalSubjectRelation: relation,
}
}

Expand Down
63 changes: 49 additions & 14 deletions entx/template/event_hooks.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

{{ $genPackage := base $.Config.Package }}

import "go.infratographer.com/permissions-api/pkg/permissions"

{{- range $node := $.Nodes }}
{{- if $nodeAnnotation := $node.Annotations.INFRA9_EVENTHOOKS }}
{{- if ne $nodeAnnotation.SubjectName "" }}
Expand All @@ -17,6 +19,7 @@
return hook.{{ $node.Name }}Func(func(ctx context.Context, m *generated.{{ $node.Name }}Mutation) (ent.Value, error) {
var err error
additionalSubjects := []gidx.PrefixedID{}
relationships := []events.AuthRelationshipRelation{}

objID, ok := m.{{ $node.ID.MutationGet }}()
if !ok {
Expand All @@ -40,7 +43,7 @@
{{ $currentValue }} := ""
{{ $f.Name }}, ok := m.{{ $f.MutationGet }}()
{{- $annotation := $f.Annotations.INFRA9_EVENTHOOKS }}
{{- if $annotation.IsAdditionalSubjectField }}
{{- if $annotation.AdditionalSubjectRelation }}
if !ok && !m.Op().Is(ent.OpCreate) {
// since we are doing an update or delete and these fields didn't change, load the "old" value
{{ $f.Name }}, err = m.{{ $f.MutationGetOld }}(ctx)
Expand All @@ -51,9 +54,19 @@
{{- if $f.Optional }}
if {{ $f.Name }} != gidx.NullPrefixedID {
additionalSubjects = append(additionalSubjects, {{ $f.Name }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: {{ $f.Name }},
})
}
{{- else }}
additionalSubjects = append(additionalSubjects, {{ $f.Name }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: {{ $f.Name }},
})
{{- end }}
{{ end }}

Expand Down Expand Up @@ -99,13 +112,19 @@
}
{{ end }}
{{ end }}

if len(relationships) != 0 {
if err := permissions.CreateAuthRelationships(ctx, "{{ $nodeAnnotation.SubjectName }}", objID, relationships...); err != nil {
return nil, fmt.Errorf("relationship request failed with error: %w", err)
}
}

msg := events.ChangeMessage{
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
FieldChanges: changeset,
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
FieldChanges: changeset,
}

// complete the mutation before we process the event
Expand All @@ -114,7 +133,7 @@
return retValue, err
}

if err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
if _, err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
return nil, fmt.Errorf("failed to publish change: %w", err)
}

Expand All @@ -128,6 +147,7 @@
func(next ent.Mutator) ent.Mutator {
return hook.{{ $node.Name }}Func(func(ctx context.Context, m *generated.{{ $node.Name }}Mutation) (ent.Value, error) {
additionalSubjects := []gidx.PrefixedID{}
relationships := []events.AuthRelationshipRelation{}

objID, ok := m.{{ $node.ID.MutationGet }}()
if !ok {
Expand All @@ -142,33 +162,48 @@
{{- range $f := $node.Fields }}
{{- if not $f.Sensitive }}
{{- $annotation := $f.Annotations.INFRA9_EVENTHOOKS }}
{{- if $annotation.IsAdditionalSubjectField }}
{{- if $annotation.AdditionalSubjectRelation }}
{{- if $f.Optional }}
if dbObj.{{ $f.MutationGet }} != gidx.NullPrefixedID {
additionalSubjects = append(additionalSubjects, dbObj.{{ $f.MutationGet }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: dbObj.{{ $f.MutationGet }},
})
}
{{- else }}
additionalSubjects = append(additionalSubjects, dbObj.{{ $f.MutationGet }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: dbObj.{{ $f.MutationGet }},
})
{{- end }}
{{ end }}
{{ end }}
{{ end }}

if len(relationships) != 0 {
if err := permissions.DeleteAuthRelationships(ctx, "{{ $nodeAnnotation.SubjectName }}", objID, relationships...); err != nil {
return nil, fmt.Errorf("relationship request failed with error: %w", err)
}
}

// we have all the info we need, now complete the mutation before we process the event
retValue, err := next.Mutate(ctx, m)
if err != nil {
return retValue, err
}

msg := events.ChangeMessage{
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
}


if err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
if _, err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
return nil, fmt.Errorf("failed to publish change: %w", err)
}

Expand Down
74 changes: 29 additions & 45 deletions events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,46 @@ import (

"github.com/spf13/pflag"
"github.com/spf13/viper"

"go.infratographer.com/x/viperx"
"go.uber.org/multierr"
"go.uber.org/zap"
)

var defaultTimeout = time.Second * 10

// PublisherConfig handles reading in all the config values available for setting up a pubsub publisher
type PublisherConfig struct {
URL string `mapstructure:"url"`
Timeout time.Duration `mapstructure:"timeout"`
Prefix string `mapstructure:"prefix"`
Source string `mapstructure:"source"`
NATSConfig NATSConfig `mapstructure:"nats"`
}
const (
defaultTimeout = time.Second * 10
tracerName = "go.infratographer.com/x/events"
)

// SubscriberConfig handles reading in all the config values available for setting up a pubsub publisher
type SubscriberConfig struct {
URL string `mapstructure:"url"`
Timeout time.Duration `mapstructure:"timeout"`
Prefix string `mapstructure:"prefix"`
QueueGroup string `mapstructure:"queueGroup"`
NATSConfig NATSConfig `mapstructure:"nats"`
// Config contains event provider configs.
type Config struct {
NATS NATSConfig `mapstructure:"nats"`
}

// NATSConfig handles reading in all pubsub values specific to NATS
type NATSConfig struct {
Token string `mapstructure:"token"`
CredsFile string `mapstructure:"credsFile"`
// MustViperFlags returns the cobra flags and viper config for events.
func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet, appName string) {
MustViperFlagsForNATS(v, flags, appName)
}

// MustViperFlagsForPublisher returns the cobra flags and viper config for an event publisher
func MustViperFlagsForPublisher(v *viper.Viper, flags *pflag.FlagSet, appName string) {
flags.String("events-publisher-url", "nats://nats:4222", "nats server connection url")
viperx.MustBindFlag(v, "events.publisher.url", flags.Lookup("events-publisher-url"))
// Option configures a connection option.
type Option func(config *Config) error

v.MustBindEnv("events.publisher.timeout")
v.MustBindEnv("events.publisher.prefix")
v.MustBindEnv("events.publisher.source")
v.MustBindEnv("events.publisher.nats.token")
v.MustBindEnv("events.publisher.nats.credsFile")
// WithLogger sets the logger for the connection.
func WithLogger(logger *zap.SugaredLogger) Option {
return func(config *Config) error {
config.NATS.logger = logger

v.SetDefault("events.publisher.timeout", defaultTimeout)
v.SetDefault("events.publisher.source", appName)
return nil
}
}

// MustViperFlagsForSubscriber returns the cobra flags and viper config for an event subscriber
func MustViperFlagsForSubscriber(v *viper.Viper, flags *pflag.FlagSet) {
flags.String("events-subscriber-url", "nats://nats:4222", "nats server connection url")
viperx.MustBindFlag(v, "events.subscriber.url", flags.Lookup("events-subscriber-url"))
flags.String("events-subscriber-queuegroup", "", "subscriber queue group")
viperx.MustBindFlag(v, "events.subscriber.queueGroup", flags.Lookup("events-subscriber-queuegroup"))
// WithNATSOptions configures nats options.
func WithNATSOptions(options ...NATSOption) Option {
return func(config *Config) error {
var err error

v.MustBindEnv("events.subscriber.timeout")
v.MustBindEnv("events.subscriber.prefix")
v.MustBindEnv("events.subscriber.nats.token")
v.MustBindEnv("events.subscriber.nats.credsFile")
for _, opt := range options {
err = multierr.Append(err, opt(&config.NATS))
mikemrm marked this conversation as resolved.
Show resolved Hide resolved
}

v.SetDefault("events.subscriber.timeout", defaultTimeout)
return err
}
}
69 changes: 69 additions & 0 deletions events/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events

import (
"context"

"go.uber.org/multierr"
)

// Connection defines a connection handler.
type Connection interface {
// Gracefully close the connection.
Shutdown(ctx context.Context) error

// Source gives you the raw underlying connection object.
Source() any

Subscriber
Publisher

AuthRelationshipSubscriber
AuthRelationshipPublisher
}

// Subscriber specifies subscriber methods.
type Subscriber interface {
// SubscribeChanges subscribes to the provided topic responding with an ChangeMessage message.
SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)
// SubscribeEvents subscribes to the provided topic responding with an EventMessage message.
SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)
}

// Publisher specifies publisher methods.
type Publisher interface {
// PublishChange publishes to the specified topic with the message given.
PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)
// PublishEvent publishes to the specified topic with the message given.
PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)
}

// AuthRelationshipSubscriber specifies the auth relationship subscriber methods.
type AuthRelationshipSubscriber interface {
// SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message.
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
}

// AuthRelationshipPublisher specifies the auth relationship publisher methods.
type AuthRelationshipPublisher interface {
// PublishAuthRelationshipRequest publishes to the specified topic with the message given.
PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)
}

// NewConnection creates a new Connection from the provided config.
func NewConnection(config Config, options ...Option) (Connection, error) {
var err error

for _, opt := range options {
err = multierr.Append(err, opt(&config))
}

if err != nil {
return nil, err
}

if config.NATS.Configured() {
return NewNATSConnection(config.NATS)
}

return nil, ErrProviderNotConfigured
}
32 changes: 32 additions & 0 deletions events/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package events

import "errors"

var (
// ErrProviderNotConfigured is an error packages should return if no events provider is configured.
ErrProviderNotConfigured = errors.New("events provider not configured")

// ErrMissingChangeMessageEventType is returned when the event message has the incorrect field EventType value.
ErrMissingChangeMessageEventType = errors.New("change message EventType field required")
// ErrMissingChangeMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
ErrMissingChangeMessageSubjectID = errors.New("change message SubjectID field required")

// ErrMissingEventMessageEventType is returned when the event message has the incorrect field EventType value.
ErrMissingEventMessageEventType = errors.New("event message EventType field required")
// ErrMissingEventMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
ErrMissingEventMessageSubjectID = errors.New("event message SubjectID field required")

// ErrInvalidAuthRelationshipRequestAction is returned when the event message has the incorrect field Action value.
ErrInvalidAuthRelationshipRequestAction = errors.New("auth relationship request message Action field must be write or delete")
// ErrMissingAuthRelationshipRequestObjectID is returned when the event message has the incorrect field ObjectID value.
ErrMissingAuthRelationshipRequestObjectID = errors.New("auth relationship request message ObjectID field required")
// ErrMissingAuthRelationshipRequestRelation is returned when the event message has no relations defined.
ErrMissingAuthRelationshipRequestRelation = errors.New("auth relationship request message Relations field required")
// ErrMissingAuthRelationshipRequestRelationRelation is returned when the event message Relations has the incorrect field for Relation value.
ErrMissingAuthRelationshipRequestRelationRelation = errors.New("auth relationship request message Relations Relation field required")
// ErrMissingAuthRelationshipRequestRelationSubjectID is returned when the event message Relations has the incorrect field SubjectID value.
ErrMissingAuthRelationshipRequestRelationSubjectID = errors.New("auth relationship request message Relations SubjectID field required")

// ErrRequestNoResponders is returned when a request is attempted but no responder is listening.
ErrRequestNoResponders = errors.New("no responders for request")
)
Loading