Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct event relationships #121

Merged
merged 6 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chart/permissions-api/templates/deployment-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ spec:
{{- end }}
{{- if .Values.config.events.nats.token }}
- name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_NATS_TOKEN
value: "{{ .Values.config.events.token }}"
value: "{{ .Values.config.events.nats.token }}"
{{- end }}
{{- if .Values.config.oidc.issuer }}
{{- with .Values.config.oidc.audience }}
Expand Down
14 changes: 9 additions & 5 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy))
engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger))

subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine)
subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine, pubsub.WithLogger(logger))
if err != nil {
logger.Fatalw("unable to initialize subscriber", "error", err)
}
Expand All @@ -75,9 +75,13 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
}
}

if err := subscriber.Listen(); err != nil {
logger.Fatalw("error listening for events", "error", err)
}
logger.Info("Listening for events")

go func() {
if err := subscriber.Listen(); err != nil {
logger.Fatalw("error listening for events", "error", err)
}
}()

// Wait until we're told to stop
sig := <-sigCh
Expand Down
43 changes: 31 additions & 12 deletions internal/pubsub/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (s *Subscriber) Subscribe(topic string) error {

s.changeChannels = append(s.changeChannels, msgChan)

s.logger.Infof("Subscribing to topic %s", topic)

return nil
}

Expand Down Expand Up @@ -163,6 +165,13 @@ func (s *Subscriber) processEvent(msg *message.Message) error {
func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Message, resource types.Resource, additionalSubjectIDs []gidx.PrefixedID) error {
var relationships []types.Relationship

rType := s.qe.GetResourceType(resource.Type)
if rType == nil {
s.logger.Warnw("no resource type found for", "resource_type", resource.Type)

return nil
}

// Attempt to create relationships from the message fields. If this fails, reject the message
for _, id := range additionalSubjectIDs {
subjResource, err := s.qe.NewResourceFromID(id)
Expand All @@ -172,13 +181,27 @@ func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Messa
continue
}

relationship := types.Relationship{
Resource: resource,
Relation: subjResource.Type,
Subject: subjResource,
}
for _, rel := range rType.Relationships {
var relation string

for _, tName := range rel.Types {
if tName == subjResource.Type {
relation = rel.Relation

relationships = append(relationships, relationship)
break
}
}

if relation != "" {
relationship := types.Relationship{
Resource: resource,
Relation: relation,
Subject: subjResource,
}

relationships = append(relationships, relationship)
}
}
}

if len(relationships) == 0 {
Expand All @@ -190,9 +213,7 @@ func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Messa
// Attempt to create the relationships in SpiceDB. If this fails, nak the message for reprocessing
_, err := s.qe.CreateRelationships(ctx, relationships)
if err != nil {
s.logger.Errorw("error creating relationships - will reprocess", "error", err.Error())

return err
s.logger.Errorw("error creating relationships - will not reprocess", "error", err.Error())
}

return nil
Expand All @@ -201,9 +222,7 @@ func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Messa
func (s *Subscriber) deleteRelationships(ctx context.Context, msg *message.Message, resource types.Resource) error {
_, err := s.qe.DeleteRelationships(ctx, resource)
if err != nil {
s.logger.Errorw("error deleting relationships - will reprocess", "error", err.Error())

return err
s.logger.Errorw("error deleting relationships - will not reprocess", "error", err.Error())
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/pubsub/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestNATS(t *testing.T) {
return context.WithValue(ctx, contextKeyEngine, &engine)
},
CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) {
require.ErrorIs(t, result.Err, eventtools.ErrNack)
require.NoError(t, result.Err)
},
},
{
Expand Down
9 changes: 9 additions & 0 deletions internal/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,13 @@ var (

// ErrInvalidReference represents an error condition where a given SpiceDB object reference is for some reason invalid.
ErrInvalidReference = errors.New("invalid reference")

// ErrInvalidNamespace represents an error when the id prefix is not found in the resource schema
ErrInvalidNamespace = errors.New("invalid namespace")

// ErrInvalidType represents an error when a resource type is not found in the resource schema
ErrInvalidType = errors.New("invalid type")

// ErrInvalidRelationship represents an error when no matching relationship was found
ErrInvalidRelationship = errors.New("invalid relationship")
)
15 changes: 15 additions & 0 deletions internal/query/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ func (e *Engine) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) {
return out, nil
}

// GetResourceType returns the resource type by name
func (e *Engine) GetResourceType(name string) *types.ResourceType {
if e.schema == nil {
e.schema = iapl.DefaultPolicy().Schema()
}

for _, resourceType := range e.schema {
if resourceType.Name == name {
return &resourceType
}
}

return nil
}

// SubjectHasPermission returns nil to satisfy the Engine interface.
func (e *Engine) SubjectHasPermission(ctx context.Context, subject types.Resource, action string, resource types.Resource, queryToken string) error {
e.Called()
Expand Down
25 changes: 15 additions & 10 deletions internal/query/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package query

import (
"context"
"errors"
"io"
"strings"

Expand All @@ -13,20 +12,14 @@ import (

var roleSubjectRelation = "subject"

var (
errorInvalidNamespace = errors.New("invalid namespace")
errorInvalidType = errors.New("invalid type")
errorInvalidRelationship = errors.New("invalid relationship")
)

func (e *engine) getTypeForResource(res types.Resource) (types.ResourceType, error) {
for _, resType := range e.schema {
if res.Type == resType.Name {
return resType, nil
}
}

return types.ResourceType{}, errorInvalidType
return types.ResourceType{}, ErrInvalidType
}

func (e *engine) validateRelationship(rel types.Relationship) error {
Expand All @@ -40,6 +33,8 @@ func (e *engine) validateRelationship(rel types.Relationship) error {
return err
}

e.logger.Infow("validation relationship", "sub", subjType.Name, "rel", rel.Relation, "res", resType.Name)

for _, typeRel := range resType.Relationships {
// If we find a relation with a name and type that matches our relationship,
// return
Expand All @@ -53,7 +48,7 @@ func (e *engine) validateRelationship(rel types.Relationship) error {
}

// No matching relationship was found, so we should return an error
return errorInvalidRelationship
return ErrInvalidRelationship
}

func resourceToSpiceDBRef(namespace string, r types.Resource) *pb.ObjectReference {
Expand Down Expand Up @@ -441,7 +436,7 @@ func (e *engine) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) {

rType, ok := e.schemaPrefixMap[prefix]
if !ok {
return types.Resource{}, errorInvalidNamespace
return types.Resource{}, ErrInvalidNamespace
}

out := types.Resource{
Expand All @@ -451,3 +446,13 @@ func (e *engine) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) {

return out, nil
}

// GetResourceType returns the resource type by name
func (e *engine) GetResourceType(name string) *types.ResourceType {
rType, ok := e.schemaTypeMap[name]
if !ok {
return nil
}

return &rType
}
2 changes: 1 addition & 1 deletion internal/query/relations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestRelationships(t *testing.T) {
Subject: parentRes,
},
CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[[]types.Relationship]) {
assert.ErrorIs(t, res.Err, errorInvalidRelationship)
assert.ErrorIs(t, res.Err, ErrInvalidRelationship)
},
},
{
Expand Down
10 changes: 7 additions & 3 deletions internal/query/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Engine interface {
ListRoles(ctx context.Context, resource types.Resource, queryToken string) ([]types.Role, error)
DeleteRelationships(ctx context.Context, resource types.Resource) (string, error)
NewResourceFromID(id gidx.PrefixedID) (types.Resource, error)
GetResourceType(name string) *types.ResourceType
SubjectHasPermission(ctx context.Context, subject types.Resource, action string, resource types.Resource, queryToken string) error
}

Expand All @@ -30,13 +31,16 @@ type engine struct {
client *authzed.Client
schema []types.ResourceType
schemaPrefixMap map[string]types.ResourceType
schemaTypeMap map[string]types.ResourceType
}

func (e *engine) cacheSchemaPrefixes() {
func (e *engine) cacheSchemaResources() {
e.schemaPrefixMap = make(map[string]types.ResourceType, len(e.schema))
e.schemaTypeMap = make(map[string]types.ResourceType, len(e.schema))

for _, res := range e.schema {
e.schemaPrefixMap[res.IDPrefix] = res
e.schemaTypeMap[res.Name] = res
}
}

Expand All @@ -55,7 +59,7 @@ func NewEngine(namespace string, client *authzed.Client, options ...Option) Engi
if e.schema == nil {
e.schema = iapl.DefaultPolicy().Schema()

e.cacheSchemaPrefixes()
e.cacheSchemaResources()
}

return e
Expand All @@ -76,6 +80,6 @@ func WithPolicy(policy iapl.Policy) Option {
return func(e *engine) {
e.schema = policy.Schema()

e.cacheSchemaPrefixes()
e.cacheSchemaResources()
}
}
4 changes: 2 additions & 2 deletions policy.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ actionbindings:
- relationshipaction:
relation: parent
actionname: loadbalancer_delete
- actionname: loadbalancer_create
- actionname: loadbalancer_get
typename: loadbalancer
conditions:
- rolebinding: {}
- relationshipaction:
relation: owner
actionname: loadbalancer_create
actionname: loadbalancer_get
- actionname: loadbalancer_update
typename: loadbalancer
conditions:
Expand Down