diff --git a/cmd/createrole.go b/cmd/createrole.go index 594f0157..9b998c2e 100644 --- a/cmd/createrole.go +++ b/cmd/createrole.go @@ -6,7 +6,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "go.infratographer.com/x/crdbx" - "go.infratographer.com/x/events" "go.infratographer.com/x/gidx" "go.infratographer.com/x/viperx" @@ -65,16 +64,6 @@ func createRole(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("unable to initialize spicedb client", "error", err) } - eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) - if err != nil { - logger.Fatalw("failed to initialize events", "error", err) - } - - kv, err := initializeKV(cfg.Events, eventsConn) - if err != nil { - logger.Fatalw("failed to initialize KV", "error", err) - } - db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled) if err != nil { logger.Fatalw("unable to initialize permissions-api database", "error", err) @@ -109,17 +98,11 @@ func createRole(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("error parsing subject ID", "error", err) } - engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy), query.WithLogger(logger)) + engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy), query.WithLogger(logger)) if err != nil { logger.Fatalw("error creating engine", "error", err) } - defer func() { - if err := engine.Stop(); err != nil { - logger.Errorw("error stopping engine", "error", err) - } - }() - resource, err := engine.NewResourceFromID(resourceID) if err != nil { logger.Fatalw("error creating resource", "error", err) diff --git a/cmd/kv.go b/cmd/kv.go deleted file mode 100644 index a5c287e1..00000000 --- a/cmd/kv.go +++ /dev/null @@ -1,30 +0,0 @@ -package cmd - -import ( - "errors" - - "github.com/nats-io/nats.go" - "go.infratographer.com/x/events" - - "go.infratographer.com/permissions-api/internal/config" -) - -var ( - errInvalidSource = errors.New("events source must be a NATS connection") -) - -func initializeKV(cfg config.EventsConfig, eventsConn events.Connection) (nats.KeyValue, error) { - // While in theory the events package supports any kind of broker, in practice we only - // support NATS. - natsConn, ok := eventsConn.Source().(*nats.Conn) - if !ok { - return nil, errInvalidSource - } - - js, err := natsConn.JetStream() - if err != nil { - return nil, err - } - - return js.KeyValue(cfg.ZedTokenBucket) -} diff --git a/cmd/server.go b/cmd/server.go index cfb947b3..4e1d73fe 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -8,7 +8,6 @@ import ( "go.infratographer.com/x/crdbx" "go.infratographer.com/x/echojwtx" "go.infratographer.com/x/echox" - "go.infratographer.com/x/events" "go.infratographer.com/x/otelx" "go.infratographer.com/x/versionx" "go.uber.org/zap" @@ -39,7 +38,6 @@ func init() { echox.MustViperFlags(v, serverCmd.Flags(), apiDefaultListen) otelx.MustViperFlags(v, serverCmd.Flags()) echojwtx.MustViperFlags(v, serverCmd.Flags()) - events.MustViperFlags(v, serverCmd.Flags(), appName) } func serve(_ context.Context, cfg *config.AppConfig) { @@ -53,16 +51,6 @@ func serve(_ context.Context, cfg *config.AppConfig) { logger.Fatalw("unable to initialize spicedb client", "error", err) } - eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) - if err != nil { - logger.Fatalw("failed to initialize events", "error", err) - } - - kv, err := initializeKV(cfg.Events, eventsConn) - if err != nil { - logger.Fatalw("failed to initialize KV", "error", err) - } - db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled) if err != nil { logger.Fatalw("unable to initialize permissions-api database", "error", err) @@ -87,17 +75,11 @@ func serve(_ context.Context, cfg *config.AppConfig) { logger.Fatalw("invalid spicedb policy", "error", err) } - engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy), query.WithLogger(logger)) + engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy), query.WithLogger(logger)) if err != nil { logger.Fatalw("error creating engine", "error", err) } - defer func() { - if err := engine.Stop(); err != nil { - logger.Errorw("error stopping engine", "error", err) - } - }() - srv, err := echox.NewServer( logger.Desugar(), echox.ConfigFromViper(viper.GetViper()), diff --git a/cmd/worker.go b/cmd/worker.go index 3f18d9bb..5ab6d610 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -59,11 +59,6 @@ func worker(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("failed to initialize events", "error", err) } - kv, err := initializeKV(cfg.Events, eventsConn) - if err != nil { - logger.Fatalw("failed to initialize KV", "error", err) - } - db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled) if err != nil { logger.Fatalw("unable to initialize permissions-api database", "error", err) @@ -88,7 +83,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("invalid spicedb policy", "error", err) } - engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy)) + engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy)) if err != nil { logger.Fatalw("error creating engine", "error", err) } diff --git a/docs/20240515-zedtokens.md b/docs/20240515-zedtokens.md new file mode 100644 index 00000000..18a2c0eb --- /dev/null +++ b/docs/20240515-zedtokens.md @@ -0,0 +1,46 @@ +# Consistency with ZedTokens + +## Overview + +In SpiceDB, resources are arranged in a graph where different services may be contributing to that graph. For example, permissions-api itself manages roles and role bindings, while tenant-api manages tenants. Resources are updated over NATS request/reply. + +SpiceDB breaks updates down into quantized revisions of configurable duration, meaning ACL updates within a given revision window are not visible by default until the next revision begins. Using ZedTokens, however, clients can request data at least as fresh as a given exact point in time. This allows for clients to control the consistency they need for a given SpiceDB request on a per-request basis. + +In general, services expect that immediately after updating relationships in permissions-api, they can make authorization decisions based on data at least as fresh as when they made those updates. + +## Goals + +This proposal is meant to achieve the following goals: + +* Permissions checks made on a resource after it is updated must use data at least as fresh as the last update to that resource + +## Non-goals + +This proposal is not meant to achieve the following goals: + +* Updates to the graph are globally and immediately propagated + +## Proposed solution + +To mitigate this issue, permissions-api can be updated to use a table in CRDB to populate ZedTokens for recently updated resources. By doing this, permissions-api becomes responsible for determining the freshness of lookups. On permissions checks for a given resource, the following consistency strategy is used: + +* If a ZedToken exists for that resource, use `at_least_as_fresh` with the given ZedToken +* If no ZedToken exists, use `minimize_latency` + +The advantage of this approach is that it keeps management of consistency within permissions-api, meaning future changes do not (necessarily) require updates to other services, nor does it change current API semantics. Additionally, it does not introduce new dependencies to permissions-api, instead leveraging CRDB's availability and fault tolerance guarantees. + +## Constraints and limitations + +As designed, this only provides immediately visible updates for a given resource; other indirectly affected resources are not updated (but could be in the future). Additionally, given CRDB leaseholder behavior, multi-region deployments may see longer durations when performing permissions checks if the ZedToken table is not a global table. + +## Alternatives considered + +### Stream ZedToken updates using the SpiceDB Watch API + +We could minimize lookup time for ZedTokens by using the SpiceDB Watch API and streaming updates to all permissions-api replicas. However, this means updates will not be immediately visible for a given resource globally. While tokens could be stored in clients (i.e., using the IAM runtime), this merely pushes the complexity of managing ZedTokens to downstream services. + +### Store ZedTokens using NATS KV + +ZedTokens could be stored using KV in NATS as a coordination mechanism between servers globally. NATS clusters are in general meant to have very low latency between servers, where a single cluster coordinates a given stream (and thus a given KV bucket). This means that for immediate updates to resources using KV, all permissions-api replicas either need to read from a single NATS cluster (introducing latency as a function of distance from the cluster) or use a large cluster that spans many regions. The latter approach is [explicitly discouraged][nats-discussion] by NATS maintainers. + +[nats-discussion]: https://github.com/nats-io/nats-server/discussions/5317#discussioncomment-9138192 diff --git a/go.mod b/go.mod index 0e50ccbc..45c26730 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,8 @@ require ( github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b github.com/cockroachdb/cockroach-go/v2 v2.3.7 github.com/go-jose/go-jose/v4 v4.0.1 - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jackc/pgx/v5 v5.5.5 github.com/labstack/echo/v4 v4.11.4 - github.com/nats-io/nats.go v1.34.1 github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.19.2 github.com/spf13/cobra v1.8.0 @@ -68,6 +66,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/nats-io/jwt/v2 v2.5.5 // indirect github.com/nats-io/nats-server/v2 v2.10.12 // indirect + github.com/nats-io/nats.go v1.34.1 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 29c8f71a..e6ca63f6 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/internal/query/relations.go b/internal/query/relations.go index ae807ab4..ed965c8a 100644 --- a/internal/query/relations.go +++ b/internal/query/relations.go @@ -113,7 +113,7 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc defer span.End() - consistency, consName := e.determineConsistency(resource) + consistency, consName := e.determineConsistency(ctx, resource) span.SetAttributes( attribute.String( "permissions.consistency", @@ -284,7 +284,7 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations } } - relUpdates := e.relationshipsToUpdates(rels) + relUpdates := e.relationshipsToUpdates(rels, pb.RelationshipUpdate_OPERATION_TOUCH) request := &pb.WriteRelationshipsRequest{ Updates: relUpdates, @@ -591,7 +591,7 @@ func (e *engine) roleResourceRelationshipsTouchDelete(roleResource, resource typ return rels } -func (e *engine) relationshipsToUpdates(rels []types.Relationship) []*pb.RelationshipUpdate { +func (e *engine) relationshipsToUpdates(rels []types.Relationship, operation pb.RelationshipUpdate_Operation) []*pb.RelationshipUpdate { relUpdates := make([]*pb.RelationshipUpdate, len(rels)) for i, rel := range rels { @@ -599,7 +599,7 @@ func (e *engine) relationshipsToUpdates(rels []types.Relationship) []*pb.Relatio resRef := resourceToSpiceDBRef(e.namespace, rel.Resource) relUpdates[i] = &pb.RelationshipUpdate{ - Operation: pb.RelationshipUpdate_OPERATION_TOUCH, + Operation: operation, Relationship: &pb.Relationship{ Resource: resRef, Relation: rel.Relation, @@ -677,65 +677,22 @@ func (e *engine) DeleteRelationships(ctx context.Context, relationships ...types return multierr.Combine(errors...) } - errors = []error{} - - var ( - complete []types.Relationship - dErr error - cErr error - ) - - span.AddEvent("deleting relationships") - - for i, relationship := range relationships { - resType := e.namespace + "/" + relationship.Resource.Type - subjType := e.namespace + "/" + relationship.Subject.Type - - filter := &pb.RelationshipFilter{ - ResourceType: resType, - OptionalResourceId: relationship.Resource.ID.String(), - OptionalRelation: relationship.Relation, - OptionalSubjectFilter: &pb.SubjectFilter{ - SubjectType: subjType, - OptionalSubjectId: relationship.Subject.ID.String(), - }, - } - - if dErr = e.deleteRelationships(ctx, filter); dErr != nil { - e.logger.Errorf("%w: failed to delete relationship %d reverting %d completed deletes", dErr, i, len(complete)) + relUpdates := e.relationshipsToUpdates(relationships, pb.RelationshipUpdate_OPERATION_DELETE) - err := fmt.Errorf("%w: failed to delete relationship %d", dErr, i) - - span.RecordError(err) - - errors = append(errors, err) - - break - } - - complete = append(complete, relationship) + request := &pb.WriteRelationshipsRequest{ + Updates: relUpdates, } - if len(errors) != 0 { - span.SetStatus(codes.Error, "error occurred deleting relationships") - - if len(complete) != 0 { - span.AddEvent("recreating deleted relationships") - - if cErr = e.CreateRelationships(ctx, complete); cErr != nil { - e.logger.Error("%w: failed to revert %d deleted relationships", cErr, len(complete)) - - err := fmt.Errorf("%w: failed to revert deleted relationships", cErr) - - span.RecordError(err) - - errors = append(errors, err) - } - } + resp, err := e.client.WriteRelationships(ctx, request) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) - return multierr.Combine(errors...) + return err } + e.updateRelationshipZedTokens(ctx, relationships, resp.WrittenAt.Token) + return nil } @@ -1266,7 +1223,12 @@ func (e *engine) rollbackUpdates(ctx context.Context, updates []*pb.Relationship }) } - return e.applyUpdates(ctx, rollbacks) + _, err := e.client.WriteRelationships(ctx, &pb.WriteRelationshipsRequest{Updates: rollbacks}) + if err != nil { + return err + } + + return nil } // applyUpdates is a wrapper function around the spiceDB WriteRelationships method diff --git a/internal/query/relations_test.go b/internal/query/relations_test.go index 24dfcff3..62860f04 100644 --- a/internal/query/relations_test.go +++ b/internal/query/relations_test.go @@ -6,12 +6,10 @@ import ( pb "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/authzed-go/v1" - "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.infratographer.com/x/gidx" - "go.infratographer.com/x/testing/eventtools" "go.infratographer.com/permissions-api/internal/iapl" "go.infratographer.com/permissions-api/internal/spicedbx" @@ -40,16 +38,6 @@ func testEngine(ctx context.Context, t *testing.T, namespace string, policy iapl _, err = client.WriteSchema(ctx, request) require.NoError(t, err) - natsSrv, err := eventtools.NewNatsServer() - require.NoError(t, err) - - kvCfg := nats.KeyValueConfig{ - Bucket: "zedtokens", - } - - kv, err := natsSrv.JetStream.CreateKeyValue(&kvCfg) - require.NoError(t, err) - t.Cleanup(func() { cleanDB(ctx, t, client, namespace, policy) cleanStore() @@ -57,13 +45,9 @@ func testEngine(ctx context.Context, t *testing.T, namespace string, policy iapl // We call the constructor here to ensure the engine is created appropriately, but // then return the underlying type so we can do testing with it. - out, err := NewEngine(namespace, client, kv, store, WithPolicy(policy)) + out, err := NewEngine(namespace, client, store, WithPolicy(policy)) require.NoError(t, err) - t.Cleanup(func() { - out.Stop() //nolint:errcheck - }) - return out.(*engine) } diff --git a/internal/query/rolebindings.go b/internal/query/rolebindings.go index 845319ef..80b00349 100644 --- a/internal/query/rolebindings.go +++ b/internal/query/rolebindings.go @@ -204,7 +204,7 @@ func (e *engine) CreateRoleBinding( updates = append(updates, subjUpdates...) - if err := e.applyUpdates(ctx, updates); err != nil { + if err := e.applyUpdates(dbCtx, updates); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) @@ -306,7 +306,7 @@ func (e *engine) DeleteRoleBinding(ctx context.Context, rb types.Resource) error } // apply changes - if err := e.applyUpdates(ctx, updates); err != nil { + if err := e.applyUpdates(dbCtx, updates); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) @@ -503,7 +503,7 @@ func (e *engine) UpdateRoleBinding(ctx context.Context, actor, rb types.Resource updates = append(updates, update) } - if err := e.applyUpdates(ctx, updates); err != nil { + if err := e.applyUpdates(dbCtx, updates); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) diff --git a/internal/query/service.go b/internal/query/service.go index 4c6d647c..af600f98 100644 --- a/internal/query/service.go +++ b/internal/query/service.go @@ -4,8 +4,6 @@ import ( "context" "github.com/authzed/authzed-go/v1" - "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/nats-io/nats.go" "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -77,8 +75,6 @@ type Engine interface { GetRoleBindingResource(ctx context.Context, rb types.Resource) (types.Resource, error) AllActions() []string - - Stop() error } type engine struct { @@ -86,9 +82,6 @@ type engine struct { logger *zap.SugaredLogger namespace string client *authzed.Client - kv nats.KeyValue - keyWatcher nats.KeyWatcher - zedTokenCache *expirable.LRU[string, string] store storage.Storage schema []types.ResourceType schemaPrefixMap map[string]types.ResourceType @@ -166,14 +159,13 @@ func resourceHasRoleBindingV2(resType types.ResourceType) *types.ConditionRoleBi } // NewEngine returns a new client for making permissions queries. -func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, store storage.Storage, options ...Option) (Engine, error) { +func NewEngine(namespace string, client *authzed.Client, store storage.Storage, options ...Option) (Engine, error) { tracer := otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/internal/query") e := &engine{ logger: zap.NewNop().Sugar(), namespace: namespace, client: client, - kv: kv, store: store, tracer: tracer, } @@ -190,17 +182,9 @@ func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, store e.cacheSchemaResources() } - if err := e.initZedTokenCache(); err != nil { - return nil, err - } - return e, nil } -func (e *engine) Stop() error { - return e.keyWatcher.Stop() -} - // Option is a functional option for the engine type Option func(*engine) diff --git a/internal/query/zedtokens.go b/internal/query/zedtokens.go index e226dff6..58c79a14 100644 --- a/internal/query/zedtokens.go +++ b/internal/query/zedtokens.go @@ -2,16 +2,14 @@ package query import ( "context" - "errors" + + "go.infratographer.com/permissions-api/internal/types" pb "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/nats-io/nats.go" + "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - - "go.infratographer.com/permissions-api/internal/types" ) const ( @@ -19,66 +17,6 @@ const ( consistencyAtLeastAsFresh = "at_least_as_fresh" ) -// initZedTokenCache creates a new LRU cache that watches KV for ZedToken updates. -func (e *engine) initZedTokenCache() error { - status, err := e.kv.Status() - if err != nil { - return err - } - - ttl := status.TTL() - - keyWatcher, err := e.kv.WatchAll() - if err != nil { - return err - } - - lru := expirable.NewLRU[string, string](0, nil, ttl) - - e.keyWatcher = keyWatcher - e.zedTokenCache = lru - - go func() { - for entry := range e.keyWatcher.Updates() { - if entry == nil { - continue - } - - key := entry.Key() - value := string(entry.Value()) - - _, span := e.tracer.Start( - context.Background(), - "populateZedTokenCache", - trace.WithAttributes( - attribute.String( - "permissions.resource", - key, - ), - ), - ) - - e.zedTokenCache.Add(key, value) - - span.End() - } - }() - - return nil -} - -// getLatestZedToken attempts to get the latest ZedToken for the given resource ID. -func (e *engine) getLatestZedToken(resourceID string) (string, bool) { - resp, ok := e.zedTokenCache.Get(resourceID) - if !ok { - return "", false - } - - zedToken := string(resp) - - return zedToken, true -} - // upsertZedToken updates the ZedToken at the given resource ID key with the provided ZedToken. func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken string) error { _, span := e.tracer.Start( @@ -94,25 +32,13 @@ func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken defer span.End() - zedTokenBytes := []byte(zedToken) - - // Attempt to get a ZedToken. If we found one, update it. If not, create it. If some other error - // happened, log that and return - resp, getErr := e.kv.Get(resourceID) - - var err error - - switch { - // If we found a token, update it. This may fail if another client updated it before we did - case getErr == nil: - _, err = e.kv.Update(resourceID, zedTokenBytes, resp.Revision()) - // If we did not find a token, create it. This may fail if another client created an entry already - case errors.Is(getErr, nats.ErrKeyNotFound): - _, err = e.kv.Create(resourceID, zedTokenBytes) - // If something else happened, just keep moving - default: + prefixedID, err := gidx.Parse(resourceID) + if err != nil { + return err } + err = e.store.UpsertZedToken(ctx, prefixedID, zedToken) + // If an error happened when creating or updating the token, record it. if err != nil { span.RecordError(err) @@ -124,8 +50,11 @@ func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken return nil } -// updateRelationshipZedTokens updates the NATS KV bucket for ZedTokens, setting the given ZedToken +// updateRelationshipZedTokens updates the CRDB table for ZedTokens, setting the given ZedToken // as the latest point in time snapshot for every resource in the given list of relationships. +// +// This function updates the table using an out of band transaction, as if it fails we do not want +// to roll back the entire outer transaction. func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.Relationship, zedToken string) { resourceIDMap := map[string]struct{}{} for _, rel := range rels { @@ -133,11 +62,39 @@ func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.R resourceIDMap[rel.Subject.ID.String()] = struct{}{} } + ctx, span := e.tracer.Start( + ctx, + "updateRelationshipZedTokens", + ) + + defer span.End() + + dbCtx, err := e.store.BeginContext(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return + } + for resourceID := range resourceIDMap { - if err := e.upsertZedToken(ctx, resourceID, zedToken); err != nil { + if err := e.upsertZedToken(dbCtx, resourceID, zedToken); err != nil { e.logger.Warnw("error upserting ZedToken", "error", err.Error(), "resource_id", resourceID) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) + + return } } + + if err = e.store.CommitContext(dbCtx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) + } } // determineConsistency produces a consistency strategy based on whether a ZedToken exists for a @@ -145,27 +102,48 @@ func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.R // retrieved ZedToken. If no such token is found, minimize_latency is used. This ensures that if // NATS is not working or available for some reason, we can still make permissions checks (albeit // in a degraded state). -func (e *engine) determineConsistency(resource types.Resource) (*pb.Consistency, string) { - resourceID := resource.ID.String() - - zedToken, ok := e.getLatestZedToken(resourceID) - if !ok { - consistency := &pb.Consistency{ - Requirement: &pb.Consistency_MinimizeLatency{ - MinimizeLatency: true, - }, - } +func (e *engine) determineConsistency(ctx context.Context, resource types.Resource) (*pb.Consistency, string) { + resourceID := resource.ID - return consistency, consistencyMinimizeLatency - } + _, span := e.tracer.Start( + ctx, + "determineConsistency", + trace.WithAttributes( + attribute.Stringer( + "permissions.resource", + resourceID, + ), + ), + ) + + defer span.End() consistency := &pb.Consistency{ - Requirement: &pb.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: &pb.ZedToken{ - Token: zedToken, - }, + Requirement: &pb.Consistency_MinimizeLatency{ + MinimizeLatency: true, }, } - return consistency, consistencyAtLeastAsFresh + consistencyName := consistencyMinimizeLatency + + zedToken, err := e.store.GetLatestZedToken(ctx, resourceID) + + switch { + case err != nil: + e.logger.Warnw("error getting ZedToken", "error", err.Error()) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + case zedToken != "": + consistency = &pb.Consistency{ + Requirement: &pb.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: &pb.ZedToken{ + Token: zedToken, + }, + }, + } + + consistencyName = consistencyAtLeastAsFresh + } + + return consistency, consistencyName } diff --git a/internal/query/zedtokens_test.go b/internal/query/zedtokens_test.go index 52935c22..e61ebf69 100644 --- a/internal/query/zedtokens_test.go +++ b/internal/query/zedtokens_test.go @@ -7,7 +7,6 @@ import ( "go.infratographer.com/permissions-api/internal/testingx" "go.infratographer.com/permissions-api/internal/types" - "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.infratographer.com/x/gidx" @@ -46,19 +45,10 @@ func TestConsistency(t *testing.T) { }, } - // Watch for updates to the key to avoid racing - kw, err := e.kv.Watch(tenantID.String(), nats.UpdatesOnly()) - require.NoError(t, err) - - defer kw.Stop() //nolint:errcheck - err = e.CreateRelationships(ctx, rels) require.NoError(t, err) - // Wait until we know an update occurred - <-kw.Updates() - return ctx }, CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[string]) { @@ -77,7 +67,7 @@ func TestConsistency(t *testing.T) { } testFn := func(ctx context.Context, res types.Resource) testingx.TestResult[string] { - _, consistencyName := e.determineConsistency(res) + _, consistencyName := e.determineConsistency(ctx, res) out := testingx.TestResult[string]{ Success: consistencyName, diff --git a/internal/storage/migrations/20240515000000_zedtokens.sql b/internal/storage/migrations/20240515000000_zedtokens.sql new file mode 100644 index 00000000..de5ee9bc --- /dev/null +++ b/internal/storage/migrations/20240515000000_zedtokens.sql @@ -0,0 +1,15 @@ +-- +goose Up + +-- create "zedtokens" table +CREATE TABLE "zedtokens" ( + "resource_id" character varying NOT NULL, + "zedtoken" character varying NOT NULL, + "created_at" timestamptz NOT NULL, + "expires_at" timestamptz NOT NULL, + PRIMARY KEY ("resource_id") +) WITH (ttl_expiration_expression = 'expires_at', ttl_job_cron = '0 */4 * * *'); + +-- +goose Down + +-- drop "zedtokens" table +DROP TABLE "zedtokens"; diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 95b60dc9..11c7bbd0 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -12,6 +12,7 @@ import ( type Storage interface { RoleService RoleBindingService + ZedTokenService TransactionManager HealthCheck(ctx context.Context) error diff --git a/internal/storage/zedtokens.go b/internal/storage/zedtokens.go new file mode 100644 index 00000000..12d2260c --- /dev/null +++ b/internal/storage/zedtokens.go @@ -0,0 +1,63 @@ +package storage + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "go.infratographer.com/x/gidx" +) + +// ZedTokenService represents a service for getting and updating ZedTokens for resources. +type ZedTokenService interface { + GetLatestZedToken(ctx context.Context, ids ...gidx.PrefixedID) (string, error) + UpsertZedToken(ctx context.Context, id gidx.PrefixedID, zedToken string) error +} + +func (e *engine) GetLatestZedToken(ctx context.Context, ids ...gidx.PrefixedID) (string, error) { + db, err := getContextDBQuery(ctx, e) + if err != nil { + return "", err + } + + inClause, args := e.buildBatchInClauseWithIDs(ids) + q := fmt.Sprintf(` + SELECT zedtoken + FROM zedtokens + WHERE resource_id IN (%s) + AND current_timestamp() < expires_at + ORDER BY created_at DESC + LIMIT 1 + `, inClause) + + var out string + + err = db.QueryRowContext(ctx, q, args...).Scan(&out) + + switch { + case err == nil: + return out, nil + case errors.Is(err, sql.ErrNoRows): + return "", nil + default: + return "", err + } +} + +func (e *engine) UpsertZedToken(ctx context.Context, id gidx.PrefixedID, zedToken string) error { + tx, err := getContextTx(ctx) + if err != nil { + return err + } + + queryStub := ` + UPSERT INTO zedtokens (resource_id, zedtoken, created_at, expires_at) + VALUES ($1, $2, current_timestamp(), current_timestamp() + (INTERVAL '1 hour')) + ` + if _, err := tx.ExecContext(ctx, queryStub, id, zedToken); err != nil { + return err + } + + return nil +}