Skip to content

Commit

Permalink
convert events to handle AuthRelationshipRequest
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm committed Aug 4, 2023
1 parent 17b64ec commit f6c33a4
Show file tree
Hide file tree
Showing 7 changed files with 313 additions and 210 deletions.
2 changes: 1 addition & 1 deletion internal/api/relationships.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (r *Router) relationshipDelete(c echo.Context) error {
Subject: relatedResource,
}

_, err = r.engine.DeleteRelationship(ctx, relationship)
_, err = r.engine.DeleteRelationships(ctx, relationship)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "error deleting relationship").SetInternal(err)
}
Expand Down
260 changes: 170 additions & 90 deletions internal/pubsub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,36 @@ package pubsub

import (
"context"
"errors"
"fmt"
"sync"
"time"

"go.infratographer.com/permissions-api/internal/query"
"go.infratographer.com/permissions-api/internal/types"
"go.infratographer.com/x/events"
"go.infratographer.com/x/gidx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
)

const nakDelay = 10 * time.Second

var tracer = otel.Tracer("go.infratographer.com/permissions-api/internal/pubsub")
var (
tracer = otel.Tracer("go.infratographer.com/permissions-api/internal/pubsub")

// ErrUnknownResourceType is returned when the corresponding resource type is not found for a resource id.
ErrUnknownResourceType = errors.New("unknown resource type")
)

// Subscriber is the subscriber client
type Subscriber struct {
ctx context.Context
changeChannels []<-chan events.Message[events.ChangeMessage]
changeChannels []<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]
logger *zap.SugaredLogger
subscriber events.Subscriber
subscriber events.AuthRelationshipSubscriber
qe query.Engine
}

Expand All @@ -39,7 +46,7 @@ func WithLogger(l *zap.SugaredLogger) SubscriberOption {
}

// NewSubscriber creates a new Subscriber
func NewSubscriber(ctx context.Context, subscriber events.Subscriber, engine query.Engine, opts ...SubscriberOption) (*Subscriber, error) {
func NewSubscriber(ctx context.Context, subscriber events.AuthRelationshipSubscriber, engine query.Engine, opts ...SubscriberOption) (*Subscriber, error) {
s := &Subscriber{
ctx: ctx,
logger: zap.NewNop().Sugar(),
Expand All @@ -56,7 +63,7 @@ func NewSubscriber(ctx context.Context, subscriber events.Subscriber, engine que

// Subscribe subscribes to a nats subject
func (s *Subscriber) Subscribe(topic string) error {
msgChan, err := s.subscriber.SubscribeChanges(s.ctx, topic)
msgChan, err := s.subscriber.SubscribeAuthRelationshipRequests(s.ctx, topic)
if err != nil {
return err
}
Expand All @@ -83,14 +90,15 @@ func (s Subscriber) Listen() error {
}

// listen listens for messages on a channel and calls the registered message handler
func (s Subscriber) listen(messages <-chan events.Message[events.ChangeMessage], wg *sync.WaitGroup) {
func (s Subscriber) listen(messages <-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], wg *sync.WaitGroup) {
defer wg.Done()

for msg := range messages {
elogger := s.logger.With(
"event.message.id", msg.ID(),
"event.message.timestamp", msg.Timestamp(),
"event.message.deliveries", msg.Deliveries(),
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
"event.message.object.id", msg.Message().ObjectID.String(),
"event.message.relations", len(msg.Message().Relations),
)

if err := s.processEvent(msg); err != nil {
Expand All @@ -100,17 +108,18 @@ func (s Subscriber) listen(messages <-chan events.Message[events.ChangeMessage],
elogger.Warnw("error occurred while naking", "error", nakErr)
}
} else if ackErr := msg.Ack(); ackErr != nil {
elogger.Warnw("error occurred while acking", "error", ackErr)
elogger.Errorw("error occurred while acking", "error", ackErr)
}
}
}

// processEvent event message handler
func (s *Subscriber) processEvent(msg events.Message[events.ChangeMessage]) error {
func (s *Subscriber) processEvent(msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error {
elogger := s.logger.With(
"event.message.id", msg.ID(),
"event.message.timestamp", msg.Timestamp(),
"event.message.deliveries", msg.Deliveries(),
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
"event.message.object.id", msg.Message().ObjectID.String(),
"event.message.relations", len(msg.Message().Relations),
)

if msg.Error() != nil {
Expand All @@ -119,141 +128,212 @@ func (s *Subscriber) processEvent(msg events.Message[events.ChangeMessage]) erro
return msg.Error()
}

changeMsg := msg.Message()
request := msg.Message()

ctx := changeMsg.GetTraceContext(context.Background())
ctx := request.GetTraceContext(context.Background())

ctx, span := tracer.Start(ctx, "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", changeMsg.SubjectID.String())))
ctx, span := tracer.Start(ctx, "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", request.ObjectID.String())))

defer span.End()

elogger = elogger.With(
"event.resource.id", changeMsg.SubjectID.String(),
"event.type", changeMsg.EventType,
)

elogger.Debugw("received message")

var err error

switch events.ChangeType(changeMsg.EventType) {
case events.CreateChangeType:
switch request.Action {
case events.WriteAuthRelationshipAction:
err = s.handleCreateEvent(ctx, msg)
case events.UpdateChangeType:
err = s.handleUpdateEvent(ctx, msg)
case events.DeleteChangeType:
case events.DeleteAuthRelationshipAction:
err = s.handleDeleteEvent(ctx, msg)
default:
elogger.Warnw("ignoring msg, not a create, update or delete event")
elogger.Warnw("ignoring msg, not a write or delete action")
}

if err != nil {
return err
}

return nil
}

func (s *Subscriber) createRelationships(ctx context.Context, relationships []types.Relationship) error {
// Attempt to create the relationships in SpiceDB.
_, err := s.qe.CreateRelationships(ctx, relationships)
if err != nil {
return fmt.Errorf("%w: error creating relationships", err)
}

return nil
}

func (s *Subscriber) deleteRelationships(ctx context.Context, relationships []types.Relationship) error {
_, err := s.qe.DeleteRelationships(ctx, relationships...)
if err != nil {
return err
}

return nil
}

func (s *Subscriber) createRelationships(ctx context.Context, msg events.Message[events.ChangeMessage], resource types.Resource, additionalSubjectIDs []gidx.PrefixedID) error {
var relationships []types.Relationship
func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error {
elogger := s.logger.With(
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
"event.message.object.id", msg.Message().ObjectID.String(),
"event.message.relations", len(msg.Message().Relations),
)

var errors []error

if err := msg.Message().Validate(); err != nil {
errors = multierr.Errors(err)
}

resource, err := s.qe.NewResourceFromID(msg.Message().ObjectID)
if err != nil {
elogger.Warnw("error parsing resource ID", "error", err.Error())

respondRequest(ctx, elogger, msg, err)

return nil
}

rType := s.qe.GetResourceType(resource.Type)
if rType == nil {
s.logger.Warnw("no resource type found for", "resource_type", resource.Type)
elogger.Warnw("error finding resource type", "error", err.Error())

return nil
respondRequest(ctx, elogger, msg, fmt.Errorf("%w: resource: %s", ErrUnknownResourceType, resource.Type))
}

// Attempt to create relationships from the message fields. If this fails, reject the message
for _, id := range additionalSubjectIDs {
subjResource, err := s.qe.NewResourceFromID(id)
relationships := make([]types.Relationship, len(msg.Message().Relations))

for i, relation := range msg.Message().Relations {
subject, err := s.qe.NewResourceFromID(relation.SubjectID)
if err != nil {
s.logger.Warnw("error parsing additional subject id - will not reprocess", "event_type", events.CreateChangeType, "id", id.String(), "error", err.Error())
elogger.Warnw("error parsing subject ID", "error", err.Error())

errors = append(errors, fmt.Errorf("%w: relation %d", err, i))

continue
}

for _, rel := range rType.Relationships {
var relation string
sType := s.qe.GetResourceType(subject.Type)
if sType == nil {
elogger.Warnw("error finding subject resource type", "error", err.Error())

for _, tName := range rel.Types {
if tName == subjResource.Type {
relation = rel.Relation
errors = append(errors, fmt.Errorf("%w: relation %d subject: %s", ErrUnknownResourceType, i, subject.Type))

break
}
}

if relation != "" {
relationship := types.Relationship{
Resource: resource,
Relation: relation,
Subject: subjResource,
}
continue
}

relationships = append(relationships, relationship)
}
relationships[i] = types.Relationship{
Resource: resource,
Relation: relation.Relation,
Subject: subject,
}
}

if len(relationships) == 0 {
s.logger.Warnw("no relations to create for resource", "resource_type", resource.Type, "resource_id", resource.ID.String())

return nil
if len(errors) != 0 {
respondRequest(ctx, elogger, msg, errors...)
}

// Attempt to create the relationships in SpiceDB. If this fails, nak the message for reprocessing
_, err := s.qe.CreateRelationships(ctx, relationships)
if err != nil {
s.logger.Errorw("error creating relationships - will not reprocess", "error", err.Error())
}
err = s.createRelationships(ctx, relationships)

respondRequest(ctx, elogger, msg, err)

return nil
}

func (s *Subscriber) deleteRelationships(ctx context.Context, msg events.Message[events.ChangeMessage], resource types.Resource) error {
_, err := s.qe.DeleteRelationships(ctx, resource)
func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error {
elogger := s.logger.With(
"event.message.topic", msg.Topic(),
"event.message.action", msg.Message().Action,
"event.message.object.id", msg.Message().ObjectID.String(),
"event.message.relations", len(msg.Message().Relations),
)

var errors []error

if err := msg.Message().Validate(); err != nil {
errors = multierr.Errors(err)
}

resource, err := s.qe.NewResourceFromID(msg.Message().ObjectID)
if err != nil {
s.logger.Errorw("error deleting relationships - will not reprocess", "error", err.Error())
elogger.Warnw("error parsing resource ID", "error", err.Error())

errors = append(errors, err)
}

return nil
}
rType := s.qe.GetResourceType(resource.Type)
if rType == nil {
elogger.Warnw("error finding resource type", "error", err.Error())

func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Message[events.ChangeMessage]) error {
resource, err := s.qe.NewResourceFromID(msg.Message().SubjectID)
if err != nil {
s.logger.Warnw("error parsing subject ID - will not reprocess", "event_type", msg.Message().EventType, "error", err.Error())
errors = append(errors, fmt.Errorf("%w: resource: %s", ErrUnknownResourceType, resource.Type))
}

return nil
relationships := make([]types.Relationship, len(msg.Message().Relations))

for i, relation := range msg.Message().Relations {
subject, err := s.qe.NewResourceFromID(relation.SubjectID)
if err != nil {
elogger.Warnw("error parsing subject ID", "error", err.Error())

errors = append(errors, fmt.Errorf("%w: relation %d", err, i))

continue
}

sType := s.qe.GetResourceType(subject.Type)
if sType == nil {
elogger.Warnw("error finding subject resource type", "error", err.Error())

errors = append(errors, fmt.Errorf("%w: relation %d subject: %s", ErrUnknownResourceType, i, subject.Type))

continue
}

relationships[i] = types.Relationship{
Resource: resource,
Relation: relation.Relation,
Subject: subject,
}
}

return s.createRelationships(ctx, msg, resource, msg.Message().AdditionalSubjectIDs)
if len(errors) != 0 {
respondRequest(ctx, elogger, msg, errors...)
}

err = s.deleteRelationships(ctx, relationships)

respondRequest(ctx, elogger, msg, err)

return nil
}

func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Message[events.ChangeMessage]) error {
resource, err := s.qe.NewResourceFromID(msg.Message().SubjectID)
if err != nil {
s.logger.Warnw("error parsing subject ID - will not reprocess", "event_type", msg.Message().EventType, "error", err.Error())
func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], errors ...error) {
var filteredErrors []error

return nil
for _, err := range errors {
if err != nil {
filteredErrors = append(filteredErrors, err)
}
}

return s.deleteRelationships(ctx, msg, resource)
}
response := events.AuthRelationshipResponse{
Errors: filteredErrors,
}

func (s *Subscriber) handleUpdateEvent(ctx context.Context, msg events.Message[events.ChangeMessage]) error {
resource, err := s.qe.NewResourceFromID(msg.Message().SubjectID)
if err != nil {
s.logger.Warnw("error parsing subject ID - will not reprocess", "event_type", msg.Message().EventType, "error", err.Error())
if len(filteredErrors) != 0 {
err := multierr.Combine(filteredErrors...)

return nil
logger.Errorw("error processing relationship, sending error response", "error", err)
} else {
logger.Debug("relationship successfully processed, sending response")
}

err = s.deleteRelationships(ctx, msg, resource)
_, err := msg.Reply(ctx, response)
if err != nil {
return err
logger.Errorw("error sending response", "error", err)
}

return s.createRelationships(ctx, msg, resource, msg.Message().AdditionalSubjectIDs)
}
Loading

0 comments on commit f6c33a4

Please sign in to comment.