diff --git a/chart/permissions-api/templates/deployment-worker.yaml b/chart/permissions-api/templates/deployment-worker.yaml index f97e8b77..4a81d2d8 100644 --- a/chart/permissions-api/templates/deployment-worker.yaml +++ b/chart/permissions-api/templates/deployment-worker.yaml @@ -58,26 +58,47 @@ spec: - name: PERMISSIONSAPI_SERVER_TRUSTED_PROXIES value: "{{ join " " . }}" {{- end }} - - name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_URL - value: "{{ .Values.config.events.url }}" - - name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_TIMEOUT - value: "{{ .Values.config.events.timeout }}" - - name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_PREFIX - value: "{{ .Values.config.events.prefix }}" - - name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_QUEUEGROUP - value: "{{ .Values.config.events.queueGroup }}" + - name: PERMISSIONSAPI_EVENTS_NATS_URL + value: "{{ .Values.config.events.nats.url }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBEPREFIX + value: "{{ .Values.config.events.nats.subscribePrefix }}" + - name: PERMISSIONSAPI_EVENTS_NATS_QUEUEGROUP + value: "{{ .Values.config.events.nats.queueGroup }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SOURCE + value: "{{ .Values.config.events.nats.source }}" + - name: PERMISSIONSAPI_EVENTS_NATS_CONNECTTIMEOUT + value: "{{ .Values.config.events.nats.connectTimeout }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SHUTDOWNTIMEOUT + value: "{{ .Values.config.events.nats.shutdownTimeout }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERFETCHBATCHSIZE + value: "{{ .Values.config.events.nats.subscriberFetchBatchSize }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERFETCHTIMEOUT + value: "{{ .Values.config.events.nats.subscriberFetchTimeout }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERFETCHBACKOFF + value: "{{ .Values.config.events.nats.subscriberFetchBackoff }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERNOACKEXPLICIT + value: "{{ .Values.config.events.nats.subscriberNoAckExplicit }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERNOMANUALACK + value: "{{ .Values.config.events.nats.subscriberNoManualAck }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERDELIVERYPOLICY + value: "{{ .Values.config.events.nats.subscriberDeliveryPolicy }}" + - name: PERMISSIONSAPI_EVENTS_NATS_SUBSCRIBERSTARTSEQUENCE + value: "{{ .Values.config.events.nats.subscriberStartSequence }}" {{- with .Values.config.events.topics }} - name: PERMISSIONSAPI_EVENTS_TOPICS value: "{{ join " " . }}" {{- end }} + {{- if .Values.config.events.nats.tokenSecretName }} + - name: PERMISSIONSAPI_EVENTS_NATS_TOKEN + valueFrom: + secretKeyRef: + name: {{ .Values.config.events.nats.tokenSecretName }} + key: token + {{- end }} {{- if .Values.config.events.nats.credsSecretName }} - - name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_NATS_CREDSFILE + - name: PERMISSIONSAPI_EVENTS_NATS_CREDSFILE value: "{{ .Values.config.events.nats.credsFile }}" {{- end }} - {{- if .Values.config.events.nats.token }} - - name: PERMISSIONSAPI_EVENTS_SUBSCRIBER_NATS_TOKEN - value: "{{ .Values.config.events.nats.token }}" - {{- end }} {{- if .Values.config.oidc.issuer }} {{- with .Values.config.oidc.audience }} - name: PERMISSIONSAPI_OIDC_AUDIENCE diff --git a/chart/permissions-api/values.yaml b/chart/permissions-api/values.yaml index 5c5abc87..887df997 100644 --- a/chart/permissions-api/values.yaml +++ b/chart/permissions-api/values.yaml @@ -39,20 +39,39 @@ config: policyConfigMapName: "" events: - # url is the event server connection url - url: "" - # timeout is event connection timeout - timeout: "" - # prefix is the subscribe event prefix - prefix: "" - # queueGroup defines the events queue group - queueGroup: "" # topics are the list of topics to subscribe to topics: [] + # nats contains nats specific configuration nats: - # token is the nats auth token - token: "" + # url is the event server connection url + url: "" + # subscribePrefix is the subscribe event prefix + subscribePrefix: "" + # queueGroup defines the events queue group + queueGroup: "" + # source defines the source of the events (defaults to application name) + source: "" + # connectTimeout is event connection timeout + connectTimeout: "10s" + # shutdownTimeout is the shutdown grace period + shutdownTimeout: "5s" + # subscriberFetchBatchSize is the subscribers fetch batch size + subscriberFetchBatchSize: "20" + # subscriberFetchTimeout is the subscribers fetch timeout + subscriberFetchTimeout: "5s" + # subscriberFetchBackoff is the subscriber fetch retry delay + subscriberFetchBackoff: "5s" + # subscriberNoAckExplicit disables Ack Explicit + subscriberNoAckExplicit: false + # subscriberNoManualAck disables Manual Ack + subscriberNoManualAck: false + # subscriberDeliveryPolicy sets the delivery policy + subscriberDeliveryPolicy: "all" + # subscriberStartSequence is the subscribers consumer start sequence (subscriberDeliveryPolicy must be `start-sequence`) + subscriberStartSequence: 0 + # tokenSecretName is the secret to load the auth token + tokenSecretName: "" # credsSecretName is the secret to load the creds auth file from credsSecretName: "" # credsFile is the location to read the creds file from diff --git a/cmd/worker.go b/cmd/worker.go index f909b5ae..6e3ee2db 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -2,6 +2,10 @@ package cmd import ( "context" + "os" + "os/signal" + "syscall" + "time" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -9,7 +13,6 @@ import ( "go.infratographer.com/x/events" "go.infratographer.com/x/otelx" "go.infratographer.com/x/versionx" - "go.infratographer.com/x/viperx" "go.uber.org/zap" "go.infratographer.com/permissions-api/internal/config" @@ -19,6 +22,8 @@ import ( "go.infratographer.com/permissions-api/internal/spicedbx" ) +const shutdownTimeout = 10 * time.Second + var workerCmd = &cobra.Command{ Use: "worker", Short: "starts a permissions-api queue worker", @@ -31,11 +36,9 @@ func init() { rootCmd.AddCommand(workerCmd) otelx.MustViperFlags(viper.GetViper(), workerCmd.Flags()) - events.MustViperFlagsForSubscriber(viper.GetViper(), workerCmd.Flags()) + events.MustViperFlags(viper.GetViper(), workerCmd.Flags(), appName) echox.MustViperFlags(viper.GetViper(), workerCmd.Flags(), apiDefaultListen) - - workerCmd.PersistentFlags().StringSlice("events-topics", []string{}, "event topics to subscribe to") - viperx.MustBindFlag(viper.GetViper(), "events.topics", workerCmd.PersistentFlags().Lookup("events-topics")) + config.MustViperFlags(viper.GetViper(), workerCmd.Flags()) } func worker(ctx context.Context, cfg *config.AppConfig) { @@ -68,39 +71,76 @@ func worker(ctx context.Context, cfg *config.AppConfig) { engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger)) - subscriber, err := pubsub.NewSubscriber(ctx, cfg.Events.Subscriber, engine, pubsub.WithLogger(logger)) + events, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) + if err != nil { + logger.Fatalw("failed to initialize events", "error", err) + } + + subscriber, err := pubsub.NewSubscriber(ctx, events, engine, + pubsub.WithLogger(logger), + ) if err != nil { logger.Fatalw("unable to initialize subscriber", "error", err) } - defer subscriber.Close() + topics := cfg.Events.Topics - for _, topic := range viper.GetStringSlice("events.topics") { + // if no topics are defined, add all topics from the schema. + if len(topics) == 0 { + schema := policy.Schema() + + for _, rt := range schema { + topics = append(topics, "*."+rt.Name) + } + } + + for _, topic := range topics { if err := subscriber.Subscribe(topic); err != nil { logger.Fatalw("failed to subscribe to changes topic", "topic", topic, "error", err) } } - logger.Info("Listening for events") + srv, err := echox.NewServer(logger.Desugar(), cfg.Server, versionx.BuildDetails()) + if err != nil { + logger.Fatal("failed to initialize new server", zap.Error(err)) + } + + srv.AddReadinessCheck("spicedb", spicedbx.Healthcheck(spiceClient)) + + quit := make(chan os.Signal, 1) + + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) go func() { + logger.Info("Listening for events") + if err := subscriber.Listen(); err != nil { logger.Fatalw("error listening for events", "error", err) } }() - srv, err := echox.NewServer( - logger.Desugar(), - echox.ConfigFromViper(viper.GetViper()), - versionx.BuildDetails(), - ) - if err != nil { - logger.Fatal("failed to initialize new server", zap.Error(err)) + go func() { + if err := srv.Run(); err != nil { + logger.Fatal("failed to run server", zap.Error(err)) + } + }() + + var cancel func() + + select { + case <-quit: + logger.Info("signal caught, shutting down") + + ctx, cancel = context.WithTimeout(ctx, shutdownTimeout) + case <-ctx.Done(): + logger.Info("context done, shutting down") + + ctx, cancel = context.WithTimeout(context.Background(), shutdownTimeout) } - srv.AddReadinessCheck("spicedb", spicedbx.Healthcheck(spiceClient)) + defer cancel() - if err := srv.Run(); err != nil { - logger.Fatal("failed to run server", zap.Error(err)) + if err := events.Shutdown(ctx); err != nil { + logger.Fatalw("failed to shutdown events gracefully", "error", "err") } } diff --git a/go.mod b/go.mod index 8f546e64..f682b9bc 100644 --- a/go.mod +++ b/go.mod @@ -3,17 +3,15 @@ module go.infratographer.com/permissions-api go 1.20 require ( - github.com/ThreeDotsLabs/watermill v1.2.0 github.com/authzed/authzed-go v0.8.0 github.com/authzed/grpcutil v0.0.0-20230524151342-4caf7fd1108a github.com/labstack/echo/v4 v4.11.1 - github.com/nats-io/nats.go v1.27.1 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.16.0 github.com/stretchr/testify v1.8.4 - go.infratographer.com/x v0.3.4 + go.infratographer.com/x v0.3.5-0.20230804152936-1fb9cfe07da6 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 go.opentelemetry.io/otel v1.16.0 @@ -26,7 +24,6 @@ require ( require ( github.com/MicahParks/keyfunc/v2 v2.0.3 // indirect - github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -36,13 +33,11 @@ require ( github.com/envoyproxy/protoc-gen-validate v0.10.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect - github.com/garsue/watermillzap v1.2.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v5 v5.0.0 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect @@ -54,7 +49,6 @@ require ( github.com/labstack/echo-contrib v0.15.0 // indirect github.com/labstack/echo-jwt/v4 v4.2.0 // indirect github.com/labstack/gommon v0.4.0 // indirect - github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect @@ -65,9 +59,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.4.1 // indirect github.com/nats-io/nats-server/v2 v2.9.17 // indirect + github.com/nats-io/nats.go v1.27.1 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect - github.com/oklog/ulid v1.3.1 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect diff --git a/go.sum b/go.sum index 91830584..fa1077c5 100644 --- a/go.sum +++ b/go.sum @@ -44,10 +44,6 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/MicahParks/keyfunc/v2 v2.0.3 h1:uKUEOc+knRO0UoucONisgNPiT85V2s/W5c0FQYsd9kc= github.com/MicahParks/keyfunc/v2 v2.0.3/go.mod h1:rW42fi+xgLJ2FRRXAfNx9ZA8WpD4OeE/yHVMteCkw9k= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/ThreeDotsLabs/watermill v1.2.0 h1:TU3TML1dnQ/ifK09F2+4JQk2EKhmhXe7Qv7eb5ZpTS8= -github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4= -github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0 h1:ZbdQ+cHwOZmXByEoKUH8SS6qR/erNQfrsNpvH5z/gfk= -github.com/ThreeDotsLabs/watermill-nats/v2 v2.0.0/go.mod h1:X6pcl579pScj4mII3KM/WJ+bcOqORqiCToy92f4gqJ4= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/authzed/authzed-go v0.8.0 h1:gb4X+7RxVqXSCFReAnKmSda68TBIqRdc47W2spLqoEc= github.com/authzed/authzed-go v0.8.0/go.mod h1:h9Zar1MSSrVsqbcbE5/RO7gpg6Fx5QYW2C5QduSox5M= @@ -100,8 +96,6 @@ github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= -github.com/garsue/watermillzap v1.2.0 h1:IA0zGb5b7mIGLXN9P2/6CmP5+f7Qgb00BdL2VCAk2SA= -github.com/garsue/watermillzap v1.2.0/go.mod h1:uo3SDSGYaw6RBzUx9jcHMYqypOTqlQ4/vz+8r1olRto= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -180,9 +174,7 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= @@ -192,8 +184,6 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA= -github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= @@ -229,8 +219,6 @@ github.com/labstack/echo/v4 v4.11.1 h1:dEpLU2FLg4UVmvCGPuk/APjlH6GDpbEPti61srUUU github.com/labstack/echo/v4 v4.11.1/go.mod h1:YuYRTSM3CHs2ybfrL8Px48bO6BAnYIN4l8wSTMP6BDQ= github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8= github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= -github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= -github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= @@ -242,6 +230,8 @@ github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APP github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/mikemrm/infratographer-x v0.0.0-20230804122801-b75f397f9d1a h1:eXzGWSXL+ltVGXso3Ji46f66ob78s8FT/g/5Nixs+HQ= +github.com/mikemrm/infratographer-x v0.0.0-20230804122801-b75f397f9d1a/go.mod h1:AMNcTkqb+yHLCbnZtiiHTC7QvN+4MOpzdOhqHXfKQUk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= @@ -261,8 +251,6 @@ github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= -github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= @@ -325,8 +313,8 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.infratographer.com/x v0.3.4 h1:K7azcoiLZPRdOnr4M7DMyB2DjZzXRVcfr7G6FeQd16o= -go.infratographer.com/x v0.3.4/go.mod h1:pXXSdeJBisAK3AdED5EFj7Yo8z8td7fOWDkNl4Dkp0s= +go.infratographer.com/x v0.3.5-0.20230804152936-1fb9cfe07da6 h1:XDsi9eXtvIyyrvn+S/H3siQl8ccfV+QJ5qU4+jGGQuU= +go.infratographer.com/x v0.3.5-0.20230804152936-1fb9cfe07da6/go.mod h1:AMNcTkqb+yHLCbnZtiiHTC7QvN+4MOpzdOhqHXfKQUk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/internal/api/relationships.go b/internal/api/relationships.go index fec54c61..546c5371 100644 --- a/internal/api/relationships.go +++ b/internal/api/relationships.go @@ -199,7 +199,7 @@ func (r *Router) relationshipDelete(c echo.Context) error { Subject: relatedResource, } - _, err = r.engine.DeleteRelationship(ctx, relationship) + _, err = r.engine.DeleteRelationships(ctx, relationship) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "error deleting relationship").SetInternal(err) } diff --git a/internal/config/config.go b/internal/config/config.go index d3fcb026..cd177b89 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -2,18 +2,22 @@ package config import ( + "github.com/spf13/pflag" + "github.com/spf13/viper" "go.infratographer.com/x/echojwtx" "go.infratographer.com/x/echox" "go.infratographer.com/x/events" "go.infratographer.com/x/loggingx" "go.infratographer.com/x/otelx" + "go.infratographer.com/x/viperx" "go.infratographer.com/permissions-api/internal/spicedbx" ) // EventsConfig stores the configuration for a load-balancer-api events config type EventsConfig struct { - Subscriber events.SubscriberConfig + events.Config `mapstructure:",squash"` + Topics []string } // AppConfig is the struct used for configuring the app @@ -25,3 +29,9 @@ type AppConfig struct { Tracing otelx.Config Events EventsConfig } + +// MustViperFlags sets the cobra flags and viper config for events. +func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet) { + flags.StringSlice("events-topics", []string{}, "event topics to subscribe to") + viperx.MustBindFlag(v, "events.topics", flags.Lookup("events-topics")) +} diff --git a/internal/pubsub/subscriber.go b/internal/pubsub/subscriber.go index d1a3990e..dc74f9d7 100644 --- a/internal/pubsub/subscriber.go +++ b/internal/pubsub/subscriber.go @@ -2,32 +2,37 @@ package pubsub import ( "context" + "errors" + "fmt" "sync" + "time" - nc "github.com/nats-io/nats.go" "go.infratographer.com/permissions-api/internal/query" "go.infratographer.com/permissions-api/internal/types" "go.infratographer.com/x/events" - "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" "go.uber.org/zap" - - "github.com/ThreeDotsLabs/watermill/message" ) +const nakDelay = 10 * time.Second + var ( tracer = otel.Tracer("go.infratographer.com/permissions-api/internal/pubsub") + + // ErrUnknownResourceType is returned when the corresponding resource type is not found for a resource id. + ErrUnknownResourceType = errors.New("unknown resource type") ) // Subscriber is the subscriber client type Subscriber struct { ctx context.Context - changeChannels []<-chan *message.Message + changeChannels []<-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse] logger *zap.SugaredLogger - subscriber *events.Subscriber - subOpts []nc.SubOpt + subscriber events.AuthRelationshipSubscriber qe query.Engine } @@ -41,48 +46,31 @@ func WithLogger(l *zap.SugaredLogger) SubscriberOption { } } -// WithNatsSubOpts sets the logger for the Subscriber -func WithNatsSubOpts(options ...nc.SubOpt) SubscriberOption { - return func(s *Subscriber) { - s.subOpts = append(s.subOpts, options...) - } -} - // NewSubscriber creates a new Subscriber -func NewSubscriber(ctx context.Context, cfg events.SubscriberConfig, engine query.Engine, opts ...SubscriberOption) (*Subscriber, error) { +func NewSubscriber(ctx context.Context, subscriber events.AuthRelationshipSubscriber, engine query.Engine, opts ...SubscriberOption) (*Subscriber, error) { s := &Subscriber{ - ctx: ctx, - logger: zap.NewNop().Sugar(), - qe: engine, + ctx: ctx, + logger: zap.NewNop().Sugar(), + qe: engine, + subscriber: subscriber, } for _, opt := range opts { opt(s) } - sub, err := events.NewSubscriber(cfg, s.subOpts...) - if err != nil { - return nil, err - } - - s.subscriber = sub - - s.logger.Debugw("subscriber configuration", "config", cfg) - return s, nil } // Subscribe subscribes to a nats subject func (s *Subscriber) Subscribe(topic string) error { - msgChan, err := s.subscriber.SubscribeChanges(s.ctx, topic) + msgChan, err := s.subscriber.SubscribeAuthRelationshipRequests(s.ctx, topic) if err != nil { return err } s.changeChannels = append(s.changeChannels, msgChan) - s.logger.Infof("Subscribing to topic %s", topic) - return nil } @@ -103,60 +91,84 @@ func (s Subscriber) Listen() error { } // listen listens for messages on a channel and calls the registered message handler -func (s Subscriber) listen(messages <-chan *message.Message, wg *sync.WaitGroup) { +func (s Subscriber) listen(messages <-chan events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], wg *sync.WaitGroup) { defer wg.Done() for msg := range messages { + elogger := s.logger.With( + "event.message.topic", msg.Topic(), + "event.message.action", msg.Message().Action, + "event.message.object.id", msg.Message().ObjectID.String(), + "event.message.relations", len(msg.Message().Relations), + ) + if err := s.processEvent(msg); err != nil { - s.logger.Warn("Failed to process msg: ", err) + elogger.Errorw("failed to process msg", "error", err) - msg.Nack() - } else { - msg.Ack() + if nakErr := msg.Nak(nakDelay); nakErr != nil { + elogger.Warnw("error occurred while naking", "error", nakErr) + } + } else if ackErr := msg.Ack(); ackErr != nil { + elogger.Errorw("error occurred while acking", "error", ackErr) } } } -// Close closes the subscriber connection and unsubscribes from all subscriptions -func (s *Subscriber) Close() error { - return s.subscriber.Close() -} - // processEvent event message handler -func (s *Subscriber) processEvent(msg *message.Message) error { - changeMsg, err := events.UnmarshalChangeMessage(msg.Payload) - if err != nil { - s.logger.Errorw("failed to process data in msg", zap.Error(err)) - - return err +func (s *Subscriber) processEvent(msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error { + elogger := s.logger.With( + "event.message.topic", msg.Topic(), + "event.message.action", msg.Message().Action, + "event.message.object.id", msg.Message().ObjectID.String(), + "event.message.relations", len(msg.Message().Relations), + ) + + if msg.Error() != nil { + elogger.Errorw("message contains error:", "error", msg.Error()) + + return msg.Error() } - ctx := events.TraceContextFromChangeMessage(context.Background(), changeMsg) + request := msg.Message() + + ctx := request.GetTraceContext(context.Background()) - ctx, span := tracer.Start(ctx, "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", changeMsg.SubjectID.String()))) + ctx, span := tracer.Start(ctx, "pubsub.receive", trace.WithAttributes(attribute.String("pubsub.subject", request.ObjectID.String()))) defer span.End() - resource, err := s.qe.NewResourceFromID(changeMsg.SubjectID) - if err != nil { - s.logger.Warnw("invalid subject id", "error", err.Error()) + elogger.Debugw("received message") + + var err error - return nil + switch request.Action { + case events.WriteAuthRelationshipAction: + err = s.handleCreateEvent(ctx, msg) + case events.DeleteAuthRelationshipAction: + err = s.handleDeleteEvent(ctx, msg) + default: + elogger.Warnw("ignoring msg, not a write or delete action") } - s.logger.Debugw("received message", "resource_type", resource.Type, "resource_id", resource.ID.String(), "event_type", changeMsg.EventType) + if err != nil { + return err + } - switch events.ChangeType(changeMsg.EventType) { - case events.CreateChangeType: - err = s.handleCreateEvent(ctx, msg, changeMsg) - case events.UpdateChangeType: - err = s.handleUpdateEvent(ctx, msg, changeMsg) - case events.DeleteChangeType: - err = s.handleDeleteEvent(ctx, msg, changeMsg) - default: - s.logger.Warnw("ignoring msg, not a create, update or delete event", "event_type", changeMsg.EventType) + return nil +} + +func (s *Subscriber) createRelationships(ctx context.Context, relationships []types.Relationship) error { + // Attempt to create the relationships in SpiceDB. + _, err := s.qe.CreateRelationships(ctx, relationships) + if err != nil { + return fmt.Errorf("%w: error creating relationships", err) } + return nil +} + +func (s *Subscriber) deleteRelationships(ctx context.Context, relationships []types.Relationship) error { + _, err := s.qe.DeleteRelationships(ctx, relationships...) if err != nil { return err } @@ -164,106 +176,170 @@ func (s *Subscriber) processEvent(msg *message.Message) error { return nil } -func (s *Subscriber) createRelationships(ctx context.Context, msg *message.Message, resource types.Resource, additionalSubjectIDs []gidx.PrefixedID) error { - var relationships []types.Relationship +func (s *Subscriber) handleCreateEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error { + elogger := s.logger.With( + "event.message.topic", msg.Topic(), + "event.message.action", msg.Message().Action, + "event.message.object.id", msg.Message().ObjectID.String(), + "event.message.relations", len(msg.Message().Relations), + ) + + var errors []error + + if err := msg.Message().Validate(); err != nil { + errors = multierr.Errors(err) + } + + resource, err := s.qe.NewResourceFromID(msg.Message().ObjectID) + if err != nil { + elogger.Warnw("error parsing resource ID", "error", err.Error()) + + return respondRequest(ctx, elogger, msg, err) + } rType := s.qe.GetResourceType(resource.Type) if rType == nil { - s.logger.Warnw("no resource type found for", "resource_type", resource.Type) + elogger.Warnw("error finding resource type", "error", err.Error()) - return nil + return respondRequest(ctx, elogger, msg, fmt.Errorf("%w: resource: %s", ErrUnknownResourceType, resource.Type)) } - // Attempt to create relationships from the message fields. If this fails, reject the message - for _, id := range additionalSubjectIDs { - subjResource, err := s.qe.NewResourceFromID(id) + relationships := make([]types.Relationship, len(msg.Message().Relations)) + + for i, relation := range msg.Message().Relations { + subject, err := s.qe.NewResourceFromID(relation.SubjectID) if err != nil { - s.logger.Warnw("error parsing additional subject id - will not reprocess", "event_type", events.CreateChangeType, "id", id.String(), "error", err.Error()) + elogger.Warnw("error parsing subject ID", "error", err.Error()) + + errors = append(errors, fmt.Errorf("%w: relation %d", err, i)) continue } - for _, rel := range rType.Relationships { - var relation string - - for _, tName := range rel.Types { - if tName == subjResource.Type { - relation = rel.Relation + sType := s.qe.GetResourceType(subject.Type) + if sType == nil { + elogger.Warnw("error finding subject resource type", "error", err.Error()) - break - } - } + errors = append(errors, fmt.Errorf("%w: relation %d subject: %s", ErrUnknownResourceType, i, subject.Type)) - if relation != "" { - relationship := types.Relationship{ - Resource: resource, - Relation: relation, - Subject: subjResource, - } + continue + } - relationships = append(relationships, relationship) - } + relationships[i] = types.Relationship{ + Resource: resource, + Relation: relation.Relation, + Subject: subject, } } - if len(relationships) == 0 { - s.logger.Warnw("no relations to create for resource", "resource_type", resource.Type, "resource_id", resource.ID.String()) + if len(errors) != 0 { + return respondRequest(ctx, elogger, msg, errors...) + } + + err = s.createRelationships(ctx, relationships) - return nil + return respondRequest(ctx, elogger, msg, err) +} + +func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse]) error { + elogger := s.logger.With( + "event.message.topic", msg.Topic(), + "event.message.action", msg.Message().Action, + "event.message.object.id", msg.Message().ObjectID.String(), + "event.message.relations", len(msg.Message().Relations), + ) + + var errors []error + + if err := msg.Message().Validate(); err != nil { + errors = multierr.Errors(err) } - // Attempt to create the relationships in SpiceDB. If this fails, nak the message for reprocessing - _, err := s.qe.CreateRelationships(ctx, relationships) + resource, err := s.qe.NewResourceFromID(msg.Message().ObjectID) if err != nil { - s.logger.Errorw("error creating relationships - will not reprocess", "error", err.Error()) + elogger.Warnw("error parsing resource ID", "error", err.Error()) + + errors = append(errors, err) } - return nil -} + rType := s.qe.GetResourceType(resource.Type) + if rType == nil { + elogger.Warnw("error finding resource type", "error", err.Error()) -func (s *Subscriber) deleteRelationships(ctx context.Context, msg *message.Message, resource types.Resource) error { - _, err := s.qe.DeleteRelationships(ctx, resource) - if err != nil { - s.logger.Errorw("error deleting relationships - will not reprocess", "error", err.Error()) + errors = append(errors, fmt.Errorf("%w: resource: %s", ErrUnknownResourceType, resource.Type)) } - return nil -} + relationships := make([]types.Relationship, len(msg.Message().Relations)) -func (s *Subscriber) handleCreateEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error { - resource, err := s.qe.NewResourceFromID(changeMsg.SubjectID) - if err != nil { - s.logger.Warnw("error parsing subject ID - will not reprocess", "event_type", changeMsg.EventType, "error", err.Error()) + for i, relation := range msg.Message().Relations { + subject, err := s.qe.NewResourceFromID(relation.SubjectID) + if err != nil { + elogger.Warnw("error parsing subject ID", "error", err.Error()) + + errors = append(errors, fmt.Errorf("%w: relation %d", err, i)) + + continue + } + + sType := s.qe.GetResourceType(subject.Type) + if sType == nil { + elogger.Warnw("error finding subject resource type", "error", err.Error()) + + errors = append(errors, fmt.Errorf("%w: relation %d subject: %s", ErrUnknownResourceType, i, subject.Type)) + + continue + } + + relationships[i] = types.Relationship{ + Resource: resource, + Relation: relation.Relation, + Subject: subject, + } + } - return nil + if len(errors) != 0 { + return respondRequest(ctx, elogger, msg, errors...) } - return s.createRelationships(ctx, msg, resource, changeMsg.AdditionalSubjectIDs) + err = s.deleteRelationships(ctx, relationships) + + return respondRequest(ctx, elogger, msg, err) } -func (s *Subscriber) handleDeleteEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error { - resource, err := s.qe.NewResourceFromID(changeMsg.SubjectID) - if err != nil { - s.logger.Warnw("error parsing subject ID - will not reprocess", "event_type", changeMsg.EventType, "error", err.Error()) +func respondRequest(ctx context.Context, logger *zap.SugaredLogger, msg events.Request[events.AuthRelationshipRequest, events.AuthRelationshipResponse], errors ...error) error { + ctx, span := tracer.Start(ctx, "pubsub.respond") - return nil + defer span.End() + + var filteredErrors []error + + for _, err := range errors { + if err != nil { + filteredErrors = append(filteredErrors, err) + } } - return s.deleteRelationships(ctx, msg, resource) -} + response := events.AuthRelationshipResponse{ + Errors: filteredErrors, + } -func (s *Subscriber) handleUpdateEvent(ctx context.Context, msg *message.Message, changeMsg events.ChangeMessage) error { - resource, err := s.qe.NewResourceFromID(changeMsg.SubjectID) - if err != nil { - s.logger.Warnw("error parsing subject ID - will not reprocess", "event_type", changeMsg.EventType, "error", err.Error()) + if len(filteredErrors) != 0 { + err := multierr.Combine(filteredErrors...) - return nil + logger.Errorw("error processing relationship, sending error response", "error", err) + } else { + logger.Debug("relationship successfully processed, sending response") } - err = s.deleteRelationships(ctx, msg, resource) + _, err := msg.Reply(ctx, response) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + logger.Errorw("error sending response", "error", err) + return err } - return s.createRelationships(ctx, msg, resource, changeMsg.AdditionalSubjectIDs) + return nil } diff --git a/internal/pubsub/subscriber_test.go b/internal/pubsub/subscriber_test.go index c26abe08..8ce35c53 100644 --- a/internal/pubsub/subscriber_test.go +++ b/internal/pubsub/subscriber_test.go @@ -2,13 +2,10 @@ package pubsub import ( "context" - "fmt" "io" - "math/rand" "testing" "time" - nc "github.com/nats-io/nats.go" "go.infratographer.com/permissions-api/internal/query" "go.infratographer.com/permissions-api/internal/query/mock" "go.infratographer.com/permissions-api/internal/testingx" @@ -19,94 +16,78 @@ import ( "github.com/stretchr/testify/require" ) -const ( - charSet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - consumerLen = 8 - sampleFrequency = "100" -) - var contextKeyEngine = struct{}{} -func newConsumerName() string { - b := make([]byte, consumerLen) - - for i := range b { - b[i] = charSet[rand.Intn(len(charSet))] - } - - return fmt.Sprintf("consumer-%s", string(b)) -} +func setupEvents(t *testing.T, engine query.Engine) (*eventtools.TestNats, events.AuthRelationshipPublisher, *Subscriber) { + ctx := context.Background() -func setupEvents(t *testing.T, engine query.Engine) (*eventtools.TestNats, *events.Publisher, *Subscriber, string) { nats, err := eventtools.NewNatsServer() require.NoError(t, err) - publisher, err := events.NewPublisher(nats.PublisherConfig) + eventHandler, err := events.NewNATSConnection(nats.Config.NATS) require.NoError(t, err) - consumerName := newConsumerName() - - subscriber, err := NewSubscriber(context.Background(), nats.SubscriberConfig, engine, - WithNatsSubOpts( - nc.ManualAck(), - nc.AckExplicit(), - nc.Durable(consumerName), - ), - ) + subscriber, err := NewSubscriber(ctx, eventHandler, engine) require.NoError(t, err) t.Cleanup(func() { nats.Close() - publisher.Close() //nolint:errcheck - subscriber.Close() //nolint:errcheck + eventHandler.Shutdown(ctx) //nolint:errcheck }) - return nats, publisher, subscriber, consumerName + return nats, eventHandler, subscriber } func TestNATS(t *testing.T) { type testInput struct { - subject string - changeMessage events.ChangeMessage + subject string + request events.AuthRelationshipRequest } - createMsg := events.ChangeMessage{ - SubjectID: gidx.PrefixedID("loadbal-UCN7pxJO57BV_5pNiV95B"), - EventType: string(events.CreateChangeType), - AdditionalSubjectIDs: []gidx.PrefixedID{ - gidx.PrefixedID("othrsid-kXboa2UZbaNzMhng9vVha"), - gidx.PrefixedID("tnntten-gd6RExwAz353UqHLzjC1n"), + createMsg := events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: gidx.PrefixedID("loadbal-UCN7pxJO57BV_5pNiV95B"), + Relations: []events.AuthRelationshipRelation{ + { + Relation: "owner", + SubjectID: gidx.PrefixedID("tnntten-gd6RExwAz353UqHLzjC1n"), + }, }, } - noCreateMsg := events.ChangeMessage{ - SubjectID: gidx.PrefixedID("loadbal-EA8CJagJPM4J-yw6_skd1"), - EventType: string(events.CreateChangeType), - AdditionalSubjectIDs: []gidx.PrefixedID{ - gidx.PrefixedID("othrsid-kXboa2UZbaNzMhng9vVha"), + noCreateMsg := events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: gidx.PrefixedID("loadbal-EA8CJagJPM4J-yw6_skd1"), + Relations: []events.AuthRelationshipRelation{ + { + Relation: "owner", + }, }, } - updateMsg := events.ChangeMessage{ - SubjectID: gidx.PrefixedID("loadbal-UCN7pxJO57BV_5pNiV95B"), - EventType: string(events.UpdateChangeType), - AdditionalSubjectIDs: []gidx.PrefixedID{ - gidx.PrefixedID("othrsid-kXboa2UZbaNzMhng9vVha"), - gidx.PrefixedID("tnntten-gd6RExwAz353UqHLzjC1n"), + deleteMsg := events.AuthRelationshipRequest{ + Action: events.DeleteAuthRelationshipAction, + ObjectID: gidx.PrefixedID("loadbal-UCN7pxJO57BV_5pNiV95B"), + Relations: []events.AuthRelationshipRelation{ + { + Relation: "owner", + SubjectID: gidx.PrefixedID("tnntten-gd6RExwAz353UqHLzjC1n"), + }, }, } - deleteMsg := events.ChangeMessage{ - SubjectID: gidx.PrefixedID("loadbal-UCN7pxJO57BV_5pNiV95B"), - EventType: string(events.DeleteChangeType), - } - - unknownResourceMsg := events.ChangeMessage{ - SubjectID: gidx.PrefixedID("baddres-BfqAzfYxtFNlpKPGYLmra"), - EventType: string(events.CreateChangeType), + unknownResourceMsg := events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: gidx.PrefixedID("baddres-BfqAzfYxtFNlpKPGYLmra"), + Relations: []events.AuthRelationshipRelation{ + { + Relation: "owner", + SubjectID: gidx.PrefixedID("tnntten-gd6RExwAz353UqHLzjC1n"), + }, + }, } // Each of these tests works as follows: @@ -118,12 +99,12 @@ func TestNATS(t *testing.T) { // // When writing tests, make sure the subject prefix in the test input matches the prefix provided in // setupClient, or else you will get undefined, racy behavior. - testCases := []testingx.TestCase[testInput, *Subscriber]{ + testCases := []testingx.TestCase[testInput, events.Message[events.AuthRelationshipResponse]]{ { Name: "goodcreate", Input: testInput{ - subject: "goodcreate.loadbalancer", - changeMessage: createMsg, + subject: "goodcreate.loadbalancer", + request: createMsg, }, SetupFn: func(ctx context.Context, t *testing.T) context.Context { var engine mock.Engine @@ -131,18 +112,20 @@ func TestNATS(t *testing.T) { return context.WithValue(ctx, contextKeyEngine, &engine) }, - CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) { + CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[events.Message[events.AuthRelationshipResponse]]) { require.NoError(t, result.Err) + require.NotNil(t, result.Success) + require.Empty(t, result.Success.Message().Errors) - engine := result.Success.qe.(*mock.Engine) + engine := ctx.Value(contextKeyEngine).(*mock.Engine) engine.AssertExpectations(t) }, }, { Name: "errcreate", Input: testInput{ - subject: "errcreate.loadbalancer", - changeMessage: createMsg, + subject: "errcreate.loadbalancer", + request: createMsg, }, SetupFn: func(ctx context.Context, t *testing.T) context.Context { var engine mock.Engine @@ -150,50 +133,34 @@ func TestNATS(t *testing.T) { return context.WithValue(ctx, contextKeyEngine, &engine) }, - CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) { + CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[events.Message[events.AuthRelationshipResponse]]) { require.NoError(t, result.Err) + require.NotNil(t, result.Success) + require.NotEmpty(t, result.Success.Message().Errors) }, }, { Name: "nocreate", Input: testInput{ - subject: "nocreate.loadbalancer", - changeMessage: noCreateMsg, - }, - SetupFn: func(ctx context.Context, t *testing.T) context.Context { - var engine mock.Engine - - return context.WithValue(ctx, contextKeyEngine, &engine) - }, - CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) { - require.NoError(t, result.Err) - }, - }, - { - Name: "goodupdate", - Input: testInput{ - subject: "goodupdate.loadbalancer", - changeMessage: updateMsg, + subject: "nocreate.loadbalancer", + request: noCreateMsg, }, SetupFn: func(ctx context.Context, t *testing.T) context.Context { var engine mock.Engine - engine.On("DeleteRelationships").Return("", nil) - engine.On("CreateRelationships").Return("", nil) return context.WithValue(ctx, contextKeyEngine, &engine) }, - CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) { - require.NoError(t, result.Err) - - engine := result.Success.qe.(*mock.Engine) - engine.AssertExpectations(t) + CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[events.Message[events.AuthRelationshipResponse]]) { + require.Error(t, result.Err) + require.ErrorIs(t, result.Err, events.ErrMissingAuthRelationshipRequestRelationSubjectID) + require.Nil(t, result.Success) }, }, { Name: "gooddelete", Input: testInput{ - subject: "gooddelete.loadbalancer", - changeMessage: deleteMsg, + subject: "gooddelete.loadbalancer", + request: deleteMsg, }, SetupFn: func(ctx context.Context, t *testing.T) context.Context { var engine mock.Engine @@ -202,42 +169,44 @@ func TestNATS(t *testing.T) { return context.WithValue(ctx, contextKeyEngine, &engine) }, - CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) { + CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[events.Message[events.AuthRelationshipResponse]]) { require.NoError(t, result.Err) + require.NotNil(t, result.Success) + require.Empty(t, result.Success.Message().Errors) - engine := result.Success.qe.(*mock.Engine) + engine := ctx.Value(contextKeyEngine).(*mock.Engine) engine.AssertExpectations(t) }, }, { Name: "badresource", Input: testInput{ - subject: "badresource.fakeresource", - changeMessage: unknownResourceMsg, + subject: "badresource.fakeresource", + request: unknownResourceMsg, }, SetupFn: func(ctx context.Context, t *testing.T) context.Context { var engine mock.Engine return context.WithValue(ctx, contextKeyEngine, &engine) }, - CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[*Subscriber]) { + CheckFn: func(ctx context.Context, t *testing.T, result testingx.TestResult[events.Message[events.AuthRelationshipResponse]]) { require.NoError(t, result.Err) + require.NotNil(t, result.Success) + require.NotEmpty(t, result.Success.Message().Errors) }, }, } - testFn := func(ctx context.Context, input testInput) testingx.TestResult[*Subscriber] { + testFn := func(ctx context.Context, input testInput) testingx.TestResult[events.Message[events.AuthRelationshipResponse]] { engine := ctx.Value(contextKeyEngine).(query.Engine) - nats, pub, sub, consumerName := setupEvents(t, engine) + _, pub, sub := setupEvents(t, engine) err := sub.Subscribe("*." + input.subject) require.NoError(t, err) go func() { - defer sub.Close() - err = sub.Listen() require.NoError(t, err) @@ -246,28 +215,13 @@ func TestNATS(t *testing.T) { // Allow time for the listener to to start time.Sleep(time.Second) - err = nats.SetConsumerSampleFrequency(consumerName, sampleFrequency) - - require.NoError(t, err) - - ackErr := make(chan error, 1) - - go func() { - ackErr <- nats.WaitForAck(consumerName, 5*time.Second) - }() - - err = pub.PublishChange(ctx, input.subject, input.changeMessage) - require.NoError(t, err) - if err = <-ackErr; err != nil { - return testingx.TestResult[*Subscriber]{ - Err: err, - } - } + resp, err := pub.PublishAuthRelationshipRequest(ctx, input.subject, input.request) - return testingx.TestResult[*Subscriber]{ - Success: sub, + return testingx.TestResult[events.Message[events.AuthRelationshipResponse]]{ + Err: err, + Success: resp, } } diff --git a/internal/query/mock/mock.go b/internal/query/mock/mock.go index 5105f077..e20ace54 100644 --- a/internal/query/mock/mock.go +++ b/internal/query/mock/mock.go @@ -87,8 +87,8 @@ func (e *Engine) ListRoles(ctx context.Context, resource types.Resource, queryTo return nil, nil } -// DeleteRelationship does nothing but satisfies the Engine interface. -func (e *Engine) DeleteRelationship(ctx context.Context, rel types.Relationship) (string, error) { +// DeleteRelationships does nothing but satisfies the Engine interface. +func (e *Engine) DeleteRelationships(ctx context.Context, relationships ...types.Relationship) (string, error) { args := e.Called() return args.String(0), args.Error(1) @@ -101,8 +101,8 @@ func (e *Engine) DeleteRole(ctx context.Context, roleResource types.Resource, qu return args.String(0), args.Error(1) } -// DeleteRelationships does nothing but satisfies the Engine interface. -func (e *Engine) DeleteRelationships(ctx context.Context, resource types.Resource) (string, error) { +// DeleteResourceRelationships does nothing but satisfies the Engine interface. +func (e *Engine) DeleteResourceRelationships(ctx context.Context, resource types.Resource) (string, error) { args := e.Called() return args.String(0), args.Error(1) diff --git a/internal/query/relations.go b/internal/query/relations.go index dfcc1cc1..0ed11dae 100644 --- a/internal/query/relations.go +++ b/internal/query/relations.go @@ -9,6 +9,10 @@ import ( pb "github.com/authzed/authzed-go/proto/authzed/api/v1" "go.infratographer.com/permissions-api/internal/types" "go.infratographer.com/x/gidx" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" ) var roleSubjectRelation = "subject" @@ -190,9 +194,16 @@ func (e *engine) checkPermission(ctx context.Context, req *pb.CheckPermissionReq // CreateRelationships atomically creates the given relationships in SpiceDB. func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relationship) (string, error) { + ctx, span := tracer.Start(ctx, "engine.CreateRelationships", trace.WithAttributes(attribute.Int("relationships", len(rels)))) + + defer span.End() + for _, rel := range rels { err := e.validateRelationship(rel) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return "", err } } @@ -205,6 +216,9 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations r, err := e.client.WriteRelationships(ctx, request) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return "", err } @@ -330,31 +344,101 @@ func (e *engine) readRelationships(ctx context.Context, filter *pb.RelationshipF return responses, nil } -// DeleteRelationship removes the specified relationship between the two resources. -func (e *engine) DeleteRelationship(ctx context.Context, rel types.Relationship) (string, error) { - err := e.validateRelationship(rel) - if err != nil { - return "", err +// DeleteRelationships removes the specified relationships. +// If any relationships fails to be deleted, all completed deletions are re-created. +func (e *engine) DeleteRelationships(ctx context.Context, relationships ...types.Relationship) (string, error) { + ctx, span := tracer.Start(ctx, "engine.DeleteRelationships", trace.WithAttributes(attribute.Int("relationships", len(relationships)))) + + defer span.End() + + var errors []error + + span.AddEvent("validating relationships") + + for i, relationship := range relationships { + err := e.validateRelationship(relationship) + if err != nil { + err = fmt.Errorf("%w: invalid relationship %d", err, i) + + span.RecordError(err) + + errors = append(errors, err) + } } - resType := e.namespace + "/" + rel.Resource.Type - subjType := e.namespace + "/" + rel.Subject.Type + if len(errors) != 0 { + span.SetStatus(codes.Error, "invalid relationships") - filter := &pb.RelationshipFilter{ - ResourceType: resType, - OptionalResourceId: rel.Resource.ID.String(), - OptionalRelation: rel.Relation, - OptionalSubjectFilter: &pb.SubjectFilter{ - SubjectType: subjType, - OptionalSubjectId: rel.Subject.ID.String(), - }, + return "", multierr.Combine(errors...) } - return e.deleteRelationships(ctx, filter) + errors = []error{} + + var ( + complete []types.Relationship + queryToken string + dErr error + cErr error + ) + + span.AddEvent("deleting relationships") + + for i, relationship := range relationships { + resType := e.namespace + "/" + relationship.Resource.Type + subjType := e.namespace + "/" + relationship.Subject.Type + + filter := &pb.RelationshipFilter{ + ResourceType: resType, + OptionalResourceId: relationship.Resource.ID.String(), + OptionalRelation: relationship.Relation, + OptionalSubjectFilter: &pb.SubjectFilter{ + SubjectType: subjType, + OptionalSubjectId: relationship.Subject.ID.String(), + }, + } + + queryToken, dErr = e.deleteRelationships(ctx, filter) + if dErr != nil { + e.logger.Errorf("%w: failed to delete relationship %d reverting %d completed deletes", dErr, i, len(complete)) + + err := fmt.Errorf("%w: failed to delete relationship %d", dErr, i) + + span.RecordError(err) + + errors = append(errors, err) + + break + } + + complete = append(complete, relationship) + } + + if len(errors) != 0 { + span.SetStatus(codes.Error, "error occurred deleting relationships") + + if len(complete) != 0 { + span.AddEvent("recreating deleted relationships") + + _, cErr = e.CreateRelationships(ctx, complete) + if cErr != nil { + e.logger.Error("%w: failed to revert %d deleted relationships", cErr, len(complete)) + + err := fmt.Errorf("%w: failed to revert deleted relationships", cErr) + + span.RecordError(err) + + errors = append(errors, err) + } + } + + return "", multierr.Combine(errors...) + } + + return queryToken, nil } -// DeleteRelationships deletes all relationships originating from the given resource. -func (e *engine) DeleteRelationships(ctx context.Context, resource types.Resource) (string, error) { +// DeleteResourceRelationships deletes all relationships originating from the given resource. +func (e *engine) DeleteResourceRelationships(ctx context.Context, resource types.Resource) (string, error) { resType := e.namespace + "/" + resource.Type filter := &pb.RelationshipFilter{ diff --git a/internal/query/relations_test.go b/internal/query/relations_test.go index d568c630..c844d40f 100644 --- a/internal/query/relations_test.go +++ b/internal/query/relations_test.go @@ -533,7 +533,7 @@ func TestRelationshipDelete(t *testing.T) { } testFn := func(ctx context.Context, input types.Relationship) testingx.TestResult[[]types.Relationship] { - queryToken, err := e.DeleteRelationship(ctx, input) + queryToken, err := e.DeleteRelationships(ctx, input) if err != nil { return testingx.TestResult[[]types.Relationship]{ Err: err, diff --git a/internal/query/service.go b/internal/query/service.go index c984b65a..8433680c 100644 --- a/internal/query/service.go +++ b/internal/query/service.go @@ -5,12 +5,17 @@ import ( "github.com/authzed/authzed-go/v1" "go.infratographer.com/x/gidx" + "go.opentelemetry.io/otel" "go.uber.org/zap" "go.infratographer.com/permissions-api/internal/iapl" "go.infratographer.com/permissions-api/internal/types" ) +var ( + tracer = otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/internal/query") +) + // Engine represents a client for making permissions queries. type Engine interface { AssignSubjectRole(ctx context.Context, subject types.Resource, role types.Role) (string, error) @@ -23,9 +28,9 @@ type Engine interface { ListRelationshipsFrom(ctx context.Context, resource types.Resource, queryToken string) ([]types.Relationship, error) ListRelationshipsTo(ctx context.Context, resource types.Resource, queryToken string) ([]types.Relationship, error) ListRoles(ctx context.Context, resource types.Resource, queryToken string) ([]types.Role, error) - DeleteRelationship(ctx context.Context, rel types.Relationship) (string, error) + DeleteRelationships(ctx context.Context, relationships ...types.Relationship) (string, error) DeleteRole(ctx context.Context, roleResource types.Resource, queryToken string) (string, error) - DeleteRelationships(ctx context.Context, resource types.Resource) (string, error) + DeleteResourceRelationships(ctx context.Context, resource types.Resource) (string, error) NewResourceFromID(id gidx.PrefixedID) (types.Resource, error) GetResourceType(name string) *types.ResourceType SubjectHasPermission(ctx context.Context, subject types.Resource, action string, resource types.Resource) error diff --git a/pkg/permissions/config.go b/pkg/permissions/config.go index 7bab9d96..90699960 100644 --- a/pkg/permissions/config.go +++ b/pkg/permissions/config.go @@ -10,10 +10,16 @@ import ( type Config struct { // URL is the URL checks should be executed against URL string + + // IgnoreNoResponders will ignore no responder errors when auth relationship requests are published. + IgnoreNoResponders bool } // MustViperFlags adds permissions config flags and viper bindings func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet) { flags.String("permissions-url", "", "sets the permissions url checks should be run against") viperx.MustBindFlag(v, "permissions.url", flags.Lookup("permissions-url")) + + flags.String("permissions-ignore-no-responders", "", "ignores no responder errors when auth relationship requests are published") + viperx.MustBindFlag(v, "permissions.ignoreAuthRelationshipNoResponders", flags.Lookup("permissions-ignore-no-responders")) } diff --git a/pkg/permissions/mockpermissions/permissions.go b/pkg/permissions/mockpermissions/permissions.go new file mode 100644 index 00000000..7bb455eb --- /dev/null +++ b/pkg/permissions/mockpermissions/permissions.go @@ -0,0 +1,50 @@ +// Package mockpermissions implements permissions.AuthRelationshipRequestHandler. +// Simplifying testing of relationship creation in applications. +package mockpermissions + +import ( + "context" + + "github.com/stretchr/testify/mock" + "go.infratographer.com/permissions-api/pkg/permissions" + "go.infratographer.com/x/events" + "go.infratographer.com/x/gidx" +) + +var _ permissions.AuthRelationshipRequestHandler = (*MockPermissions)(nil) + +// MockPermissions implements permissions.AuthRelationshipRequestHandler. +type MockPermissions struct { + mock.Mock +} + +// ContextWithHandler returns the context with the mock permissions handler defined. +func (p *MockPermissions) ContextWithHandler(ctx context.Context) context.Context { + return context.WithValue(ctx, permissions.AuthRelationshipRequestHandlerCtxKey, p) +} + +// CreateAuthRelationships implements permissions.AuthRelationshipRequestHandler. +func (p *MockPermissions) CreateAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error { + calledArgs := []interface{}{topic, resourceID} + + for _, rel := range relations { + calledArgs = append(calledArgs, rel) + } + + args := p.Called(calledArgs...) + + return args.Error(0) +} + +// DeleteAuthRelationships implements permissions.AuthRelationshipRequestHandler. +func (p *MockPermissions) DeleteAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error { + calledArgs := []interface{}{topic, resourceID} + + for _, rel := range relations { + calledArgs = append(calledArgs, rel) + } + + args := p.Called(calledArgs...) + + return args.Error(0) +} diff --git a/pkg/permissions/mockpermissions/permissions_test.go b/pkg/permissions/mockpermissions/permissions_test.go new file mode 100644 index 00000000..cfcea1de --- /dev/null +++ b/pkg/permissions/mockpermissions/permissions_test.go @@ -0,0 +1,49 @@ +package mockpermissions_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.infratographer.com/permissions-api/pkg/permissions" + "go.infratographer.com/permissions-api/pkg/permissions/mockpermissions" + "go.infratographer.com/x/events" + "go.infratographer.com/x/gidx" +) + +func TestPermissions(t *testing.T) { + t.Run("create", func(t *testing.T) { + mockPerms := new(mockpermissions.MockPermissions) + + ctx := mockPerms.ContextWithHandler(context.Background()) + + relation := events.AuthRelationshipRelation{ + Relation: "parent", + SubjectID: "tnntten-abc", + } + + mockPerms.On("CreateAuthRelationships", "test", gidx.PrefixedID("tnntten-abc123"), relation).Return(nil) + + err := permissions.CreateAuthRelationships(ctx, "test", "tnntten-abc123", relation) + require.NoError(t, err) + + mockPerms.AssertExpectations(t) + }) + t.Run("delete", func(t *testing.T) { + mockPerms := new(mockpermissions.MockPermissions) + + ctx := mockPerms.ContextWithHandler(context.Background()) + + relation := events.AuthRelationshipRelation{ + Relation: "parent", + SubjectID: "tnntten-abc", + } + + mockPerms.On("DeleteAuthRelationships", "test", gidx.PrefixedID("tnntten-abc123"), relation).Return(nil) + + err := permissions.DeleteAuthRelationships(ctx, "test", "tnntten-abc123", relation) + require.NoError(t, err) + + mockPerms.AssertExpectations(t) + }) +} diff --git a/pkg/permissions/options.go b/pkg/permissions/options.go index d1db8fff..3103a097 100644 --- a/pkg/permissions/options.go +++ b/pkg/permissions/options.go @@ -4,6 +4,7 @@ import ( "net/http" "github.com/labstack/echo/v4/middleware" + "go.infratographer.com/x/events" "go.uber.org/zap" ) @@ -19,6 +20,15 @@ func WithLogger(logger *zap.SugaredLogger) Option { } } +// WithEventsPublisher sets the underlying event publisher the auth handler uses +func WithEventsPublisher(publisher events.AuthRelationshipPublisher) Option { + return func(p *Permissions) error { + p.publisher = publisher + + return nil + } +} + // WithHTTPClient sets the underlying http client the auth handler uses func WithHTTPClient(client *http.Client) Option { return func(p *Permissions) error { diff --git a/pkg/permissions/permissions.go b/pkg/permissions/permissions.go index 33b64d47..0f0ec2fd 100644 --- a/pkg/permissions/permissions.go +++ b/pkg/permissions/permissions.go @@ -15,6 +15,7 @@ import ( "github.com/labstack/echo/v4/middleware" "github.com/pkg/errors" "go.infratographer.com/x/echojwtx" + "go.infratographer.com/x/events" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -35,23 +36,31 @@ var ( } tracer = otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/pkg/permissions") + + // ErrPermissionsMiddlewareMissing is returned when a permissions method has been called but the middleware is missing. + ErrPermissionsMiddlewareMissing = errors.New("permissions middleware missing") ) // Permissions handles supporting authorization checks type Permissions struct { - enabled bool - logger *zap.SugaredLogger - client *http.Client - url *url.URL - skipper middleware.Skipper - defaultChecker Checker + enableChecker bool + + logger *zap.SugaredLogger + publisher events.AuthRelationshipPublisher + client *http.Client + url *url.URL + skipper middleware.Skipper + defaultChecker Checker + ignoreNoResponders bool } // Middleware produces echo middleware to handle authorization checks func (p *Permissions) Middleware() echo.MiddlewareFunc { return func(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { - if !p.enabled || p.skipper(c) { + setAuthRelationshipRequestHandler(c, p) + + if !p.enableChecker || p.skipper(c) { setCheckerContext(c, p.defaultChecker) return next(c) @@ -161,10 +170,11 @@ func (p *Permissions) checker(c echo.Context, actor, token string) Checker { // New creates a new Permissions instance func New(config Config, options ...Option) (*Permissions, error) { p := &Permissions{ - enabled: config.URL != "", - client: defaultClient, - skipper: middleware.DefaultSkipper, - defaultChecker: DefaultDenyChecker, + enableChecker: config.URL != "", + client: defaultClient, + skipper: middleware.DefaultSkipper, + defaultChecker: DefaultDenyChecker, + ignoreNoResponders: config.IgnoreNoResponders, } if config.URL != "" { diff --git a/pkg/permissions/relationships.go b/pkg/permissions/relationships.go new file mode 100644 index 00000000..eee5592d --- /dev/null +++ b/pkg/permissions/relationships.go @@ -0,0 +1,139 @@ +package permissions + +import ( + "context" + "errors" + + "github.com/labstack/echo/v4" + "go.infratographer.com/x/events" + "go.infratographer.com/x/gidx" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" +) + +var ( + // AuthRelationshipRequestHandlerCtxKey is the context key used to set the auth relationship request handler. + AuthRelationshipRequestHandlerCtxKey = authRelationshipRequestHandlerCtxKey{} +) + +type authRelationshipRequestHandlerCtxKey struct{} + +func setAuthRelationshipRequestHandler(c echo.Context, requestHandler AuthRelationshipRequestHandler) { + req := c.Request().WithContext( + context.WithValue( + c.Request().Context(), + AuthRelationshipRequestHandlerCtxKey, + requestHandler, + ), + ) + + c.SetRequest(req) +} + +// AuthRelationshipRequestHandler defines the required methods to create or update an auth relationship. +type AuthRelationshipRequestHandler interface { + CreateAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error + DeleteAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error +} + +func (p *Permissions) submitAuthRelationshipRequest(ctx context.Context, topic string, request events.AuthRelationshipRequest) error { + ctx, span := tracer.Start(ctx, "permissions.submitAuthRelationshipRequest", + trace.WithAttributes( + attribute.String("events.topic", topic), + attribute.String("events.object_id", request.ObjectID.String()), + attribute.String("events.action", string(request.Action)), + ), + ) + + defer span.End() + + if err := request.Validate(); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + // if no publisher is defined, requests are disabled. + if p.publisher == nil { + span.AddEvent("publish requests disabled") + + return nil + } + + var errs []error + + resp, err := p.publisher.PublishAuthRelationshipRequest(ctx, topic, request) + if err != nil { + if p.ignoreNoResponders && errors.Is(err, events.ErrRequestNoResponders) { + span.AddEvent("ignored no request responders") + + return nil + } + + errs = append(errs, err) + } + + if resp != nil { + if resp.Error() != nil { + errs = append(errs, err) + } + + errs = append(errs, resp.Message().Errors...) + } + + err = multierr.Combine(errs...) + + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + return nil +} + +// CreateAuthRelationships publishes a create auth relationship request, blocking until a response has been received. +func (p *Permissions) CreateAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error { + request := events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: resourceID, + Relations: relations, + } + + return p.submitAuthRelationshipRequest(ctx, topic, request) +} + +// DeleteAuthRelationships publishes a delete auth relationship request, blocking until a response has been received. +func (p *Permissions) DeleteAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error { + request := events.AuthRelationshipRequest{ + Action: events.DeleteAuthRelationshipAction, + ObjectID: resourceID, + Relations: relations, + } + + return p.submitAuthRelationshipRequest(ctx, topic, request) +} + +// CreateAuthRelationships publishes a create auth relationship request, blocking until a response has been received. +func CreateAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error { + handler, ok := ctx.Value(AuthRelationshipRequestHandlerCtxKey).(AuthRelationshipRequestHandler) + if !ok { + return ErrPermissionsMiddlewareMissing + } + + return handler.CreateAuthRelationships(ctx, topic, resourceID, relations...) +} + +// DeleteAuthRelationships publishes a delete auth relationship request, blocking until a response has been received. +func DeleteAuthRelationships(ctx context.Context, topic string, resourceID gidx.PrefixedID, relations ...events.AuthRelationshipRelation) error { + handler, ok := ctx.Value(AuthRelationshipRequestHandlerCtxKey).(AuthRelationshipRequestHandler) + if !ok { + return ErrPermissionsMiddlewareMissing + } + + return handler.DeleteAuthRelationships(ctx, topic, resourceID, relations...) +} diff --git a/pkg/permissions/relationships_test.go b/pkg/permissions/relationships_test.go new file mode 100644 index 00000000..75cd6a12 --- /dev/null +++ b/pkg/permissions/relationships_test.go @@ -0,0 +1,266 @@ +package permissions_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/labstack/echo/v4" + "github.com/stretchr/testify/require" + "go.infratographer.com/permissions-api/pkg/permissions" + "go.infratographer.com/x/events" + "go.infratographer.com/x/gidx" + "go.infratographer.com/x/testing/eventtools" +) + +func TestMiddlewareMissing(t *testing.T) { + ctx := context.Background() + + err := permissions.CreateAuthRelationships(ctx, "test", gidx.NullPrefixedID) + require.Error(t, err) + require.ErrorIs(t, err, permissions.ErrPermissionsMiddlewareMissing) + + err = permissions.DeleteAuthRelationships(ctx, "test", gidx.NullPrefixedID) + require.Error(t, err) + require.ErrorIs(t, err, permissions.ErrPermissionsMiddlewareMissing) +} + +func TestNoRespondersIgnore(t *testing.T) { + t.Run("not ignored", func(t *testing.T) { + ctx := context.Background() + + mockEvents := new(eventtools.MockConnection) + + config := permissions.Config{ + IgnoreNoResponders: false, + } + + perms, err := permissions.New(config, permissions.WithEventsPublisher(mockEvents)) + require.NoError(t, err) + + relation := events.AuthRelationshipRelation{ + Relation: "parent", + SubjectID: "testten-abc", + } + + expectRelationshipRequest := events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: "testten-abc123", + Relations: []events.AuthRelationshipRelation{ + relation, + }, + } + + responseMessage := new(eventtools.MockMessage[events.AuthRelationshipResponse]) + + responseMessage.On("Message").Return(events.AuthRelationshipResponse{}) + responseMessage.On("Error").Return(nil) + + mockEvents.On("PublishAuthRelationshipRequest", "test", expectRelationshipRequest).Return(responseMessage, events.ErrRequestNoResponders) + + err = perms.CreateAuthRelationships(ctx, "test", "testten-abc123", relation) + require.Error(t, err) + require.ErrorIs(t, err, events.ErrRequestNoResponders) + + mockEvents.AssertExpectations(t) + }) + t.Run("ignored", func(t *testing.T) { + ctx := context.Background() + + mockEvents := new(eventtools.MockConnection) + + config := permissions.Config{ + IgnoreNoResponders: true, + } + + perms, err := permissions.New(config, permissions.WithEventsPublisher(mockEvents)) + require.NoError(t, err) + + relation := events.AuthRelationshipRelation{ + Relation: "parent", + SubjectID: "testten-abc", + } + + expectRelationshipRequest := events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: "testten-abc123", + Relations: []events.AuthRelationshipRelation{ + relation, + }, + } + + responseMessage := new(eventtools.MockMessage[events.AuthRelationshipResponse]) + + mockEvents.On("PublishAuthRelationshipRequest", "test", expectRelationshipRequest).Return(responseMessage, events.ErrRequestNoResponders) + + err = perms.CreateAuthRelationships(ctx, "test", "testten-abc123", relation) + require.NoError(t, err) + + mockEvents.AssertExpectations(t) + }) +} + +func TestRelationshipCreate(t *testing.T) { + testCases := []struct { + name string + events bool + + resourceID gidx.PrefixedID + relation string + subjectID gidx.PrefixedID + + expectRequest *events.AuthRelationshipRequest + + responseErrors []error + + expectError error + }{ + { + "no events", + false, + "testten-abc123", + "parent", + "testten-abc", + nil, + nil, + nil, + }, + { + "missing resourceID", + true, + "", + "relation", + "subject", + nil, + nil, + events.ErrMissingAuthRelationshipRequestObjectID, + }, + { + "missing relation", + true, + "resource", + "", + "subject", + nil, + nil, + events.ErrMissingAuthRelationshipRequestRelationRelation, + }, + { + "missing subject", + true, + "resource", + "relation", + "", + nil, + nil, + events.ErrMissingAuthRelationshipRequestRelationSubjectID, + }, + { + "success", + true, + "testten-abc123", + "parent", + "testten-abc", + &events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: "testten-abc123", + Relations: []events.AuthRelationshipRelation{ + { + Relation: "parent", + SubjectID: "testten-abc", + }, + }, + }, + nil, + nil, + }, + { + "response errors are returned", + true, + "testten-abc123", + "parent", + "testten-abc", + &events.AuthRelationshipRequest{ + Action: events.WriteAuthRelationshipAction, + ObjectID: "testten-abc123", + Relations: []events.AuthRelationshipRelation{ + { + Relation: "parent", + SubjectID: "testten-abc", + }, + }, + }, + []error{events.ErrProviderNotConfigured}, + events.ErrProviderNotConfigured, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockEvents := new(eventtools.MockConnection) + + var options []permissions.Option + + if tc.events { + options = append(options, permissions.WithEventsPublisher(mockEvents)) + } + + perms, err := permissions.New(permissions.Config{}, options...) + + require.NoError(t, err) + + resp := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + + engine := echo.New() + + ctx := engine.NewContext(req, resp) + + var nextCalled bool + + nextFn := func(c echo.Context) error { + nextCalled = true + + return nil + } + + err = perms.Middleware()(nextFn)(ctx) + + require.NoError(t, err) + + require.True(t, nextCalled, "next should have been called") + + if tc.expectRequest != nil { + response := events.AuthRelationshipResponse{ + Errors: tc.responseErrors, + } + + respMsg := new(eventtools.MockMessage[events.AuthRelationshipResponse]) + + respMsg.On("Message").Return(response, nil) + respMsg.On("Error").Return(nil) + + mockEvents.On("PublishAuthRelationshipRequest", "test", *tc.expectRequest).Return(respMsg, nil) + } + + relation := events.AuthRelationshipRelation{ + Relation: tc.relation, + SubjectID: tc.subjectID, + } + + err = perms.CreateAuthRelationships(ctx.Request().Context(), "test", tc.resourceID, relation) + + mockEvents.AssertExpectations(t) + + if tc.expectError != nil { + require.Error(t, err, "expected error to be returned") + require.ErrorIs(t, err, tc.expectError, "unexpected error returned") + + return + } + + require.NoError(t, err) + }) + } +}