From 20d9e7df35d32269bad1416f78076fff28a8dbfa Mon Sep 17 00:00:00 2001 From: John Schaeffer Date: Thu, 16 May 2024 13:25:51 -0400 Subject: [PATCH] Add design proposal and implementation for ZedTokens table (#257) * Add design proposal and implementation for ZedTokens table While we have some guarantees that we try to make around consistency in permissions-api, particularly around ZedTokens, these were not explicitly spelled out anywhere. This commit adds a design proposal for using CRDB itself to store ZedTokens for resources, rather than relying on streaming from SpiceDB or lookups in NATS KV. The motivations for the CRDB-based approach are that it grants us the consistency guarantees we want while also allowing for more flexibility than NATS provides around the location of a given leader for coordinating reads, while also removing a dependency from the permissions-api critical path. Signed-off-by: John Schaeffer * Run "go mod tidy" This commit just tidies up the module file. Signed-off-by: John Schaeffer * Remove the ZedToken cache from the query engine This commit removes the ZedToken cache from the query engine since we're not using it anymore. Signed-off-by: John Schaeffer * Simplify relationship updates, commit ZedTokens out of band This commit simplifies DeleteRelationships to use SpiceDB's WriteRelationships call under the hood and also updates updateRelationshipZedTokens to do work in an out of band transaction. Signed-off-by: John Schaeffer * Prevent reading of expired ZedTokens This commit updates ZedToken reads so that permissions-api doesn't read expired ZedTokens. Signed-off-by: John Schaeffer --------- Signed-off-by: John Schaeffer --- cmd/createrole.go | 19 +- cmd/kv.go | 30 --- cmd/server.go | 20 +- cmd/worker.go | 7 +- docs/20240515-zedtokens.md | 46 +++++ go.mod | 3 +- go.sum | 1 + internal/query/relations.go | 78 ++------ internal/query/relations_test.go | 18 +- internal/query/rolebindings.go | 6 +- internal/query/service.go | 18 +- internal/query/zedtokens.go | 180 ++++++++---------- internal/query/zedtokens_test.go | 12 +- .../migrations/20240515000000_zedtokens.sql | 15 ++ internal/storage/storage.go | 1 + internal/storage/zedtokens.go | 63 ++++++ 16 files changed, 235 insertions(+), 282 deletions(-) delete mode 100644 cmd/kv.go create mode 100644 docs/20240515-zedtokens.md create mode 100644 internal/storage/migrations/20240515000000_zedtokens.sql create mode 100644 internal/storage/zedtokens.go diff --git a/cmd/createrole.go b/cmd/createrole.go index 594f0157..9b998c2e 100644 --- a/cmd/createrole.go +++ b/cmd/createrole.go @@ -6,7 +6,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" "go.infratographer.com/x/crdbx" - "go.infratographer.com/x/events" "go.infratographer.com/x/gidx" "go.infratographer.com/x/viperx" @@ -65,16 +64,6 @@ func createRole(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("unable to initialize spicedb client", "error", err) } - eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) - if err != nil { - logger.Fatalw("failed to initialize events", "error", err) - } - - kv, err := initializeKV(cfg.Events, eventsConn) - if err != nil { - logger.Fatalw("failed to initialize KV", "error", err) - } - db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled) if err != nil { logger.Fatalw("unable to initialize permissions-api database", "error", err) @@ -109,17 +98,11 @@ func createRole(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("error parsing subject ID", "error", err) } - engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy), query.WithLogger(logger)) + engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy), query.WithLogger(logger)) if err != nil { logger.Fatalw("error creating engine", "error", err) } - defer func() { - if err := engine.Stop(); err != nil { - logger.Errorw("error stopping engine", "error", err) - } - }() - resource, err := engine.NewResourceFromID(resourceID) if err != nil { logger.Fatalw("error creating resource", "error", err) diff --git a/cmd/kv.go b/cmd/kv.go deleted file mode 100644 index a5c287e1..00000000 --- a/cmd/kv.go +++ /dev/null @@ -1,30 +0,0 @@ -package cmd - -import ( - "errors" - - "github.com/nats-io/nats.go" - "go.infratographer.com/x/events" - - "go.infratographer.com/permissions-api/internal/config" -) - -var ( - errInvalidSource = errors.New("events source must be a NATS connection") -) - -func initializeKV(cfg config.EventsConfig, eventsConn events.Connection) (nats.KeyValue, error) { - // While in theory the events package supports any kind of broker, in practice we only - // support NATS. - natsConn, ok := eventsConn.Source().(*nats.Conn) - if !ok { - return nil, errInvalidSource - } - - js, err := natsConn.JetStream() - if err != nil { - return nil, err - } - - return js.KeyValue(cfg.ZedTokenBucket) -} diff --git a/cmd/server.go b/cmd/server.go index cfb947b3..4e1d73fe 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -8,7 +8,6 @@ import ( "go.infratographer.com/x/crdbx" "go.infratographer.com/x/echojwtx" "go.infratographer.com/x/echox" - "go.infratographer.com/x/events" "go.infratographer.com/x/otelx" "go.infratographer.com/x/versionx" "go.uber.org/zap" @@ -39,7 +38,6 @@ func init() { echox.MustViperFlags(v, serverCmd.Flags(), apiDefaultListen) otelx.MustViperFlags(v, serverCmd.Flags()) echojwtx.MustViperFlags(v, serverCmd.Flags()) - events.MustViperFlags(v, serverCmd.Flags(), appName) } func serve(_ context.Context, cfg *config.AppConfig) { @@ -53,16 +51,6 @@ func serve(_ context.Context, cfg *config.AppConfig) { logger.Fatalw("unable to initialize spicedb client", "error", err) } - eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger)) - if err != nil { - logger.Fatalw("failed to initialize events", "error", err) - } - - kv, err := initializeKV(cfg.Events, eventsConn) - if err != nil { - logger.Fatalw("failed to initialize KV", "error", err) - } - db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled) if err != nil { logger.Fatalw("unable to initialize permissions-api database", "error", err) @@ -87,17 +75,11 @@ func serve(_ context.Context, cfg *config.AppConfig) { logger.Fatalw("invalid spicedb policy", "error", err) } - engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy), query.WithLogger(logger)) + engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy), query.WithLogger(logger)) if err != nil { logger.Fatalw("error creating engine", "error", err) } - defer func() { - if err := engine.Stop(); err != nil { - logger.Errorw("error stopping engine", "error", err) - } - }() - srv, err := echox.NewServer( logger.Desugar(), echox.ConfigFromViper(viper.GetViper()), diff --git a/cmd/worker.go b/cmd/worker.go index 3f18d9bb..5ab6d610 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -59,11 +59,6 @@ func worker(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("failed to initialize events", "error", err) } - kv, err := initializeKV(cfg.Events, eventsConn) - if err != nil { - logger.Fatalw("failed to initialize KV", "error", err) - } - db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled) if err != nil { logger.Fatalw("unable to initialize permissions-api database", "error", err) @@ -88,7 +83,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) { logger.Fatalw("invalid spicedb policy", "error", err) } - engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy)) + engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy)) if err != nil { logger.Fatalw("error creating engine", "error", err) } diff --git a/docs/20240515-zedtokens.md b/docs/20240515-zedtokens.md new file mode 100644 index 00000000..18a2c0eb --- /dev/null +++ b/docs/20240515-zedtokens.md @@ -0,0 +1,46 @@ +# Consistency with ZedTokens + +## Overview + +In SpiceDB, resources are arranged in a graph where different services may be contributing to that graph. For example, permissions-api itself manages roles and role bindings, while tenant-api manages tenants. Resources are updated over NATS request/reply. + +SpiceDB breaks updates down into quantized revisions of configurable duration, meaning ACL updates within a given revision window are not visible by default until the next revision begins. Using ZedTokens, however, clients can request data at least as fresh as a given exact point in time. This allows for clients to control the consistency they need for a given SpiceDB request on a per-request basis. + +In general, services expect that immediately after updating relationships in permissions-api, they can make authorization decisions based on data at least as fresh as when they made those updates. + +## Goals + +This proposal is meant to achieve the following goals: + +* Permissions checks made on a resource after it is updated must use data at least as fresh as the last update to that resource + +## Non-goals + +This proposal is not meant to achieve the following goals: + +* Updates to the graph are globally and immediately propagated + +## Proposed solution + +To mitigate this issue, permissions-api can be updated to use a table in CRDB to populate ZedTokens for recently updated resources. By doing this, permissions-api becomes responsible for determining the freshness of lookups. On permissions checks for a given resource, the following consistency strategy is used: + +* If a ZedToken exists for that resource, use `at_least_as_fresh` with the given ZedToken +* If no ZedToken exists, use `minimize_latency` + +The advantage of this approach is that it keeps management of consistency within permissions-api, meaning future changes do not (necessarily) require updates to other services, nor does it change current API semantics. Additionally, it does not introduce new dependencies to permissions-api, instead leveraging CRDB's availability and fault tolerance guarantees. + +## Constraints and limitations + +As designed, this only provides immediately visible updates for a given resource; other indirectly affected resources are not updated (but could be in the future). Additionally, given CRDB leaseholder behavior, multi-region deployments may see longer durations when performing permissions checks if the ZedToken table is not a global table. + +## Alternatives considered + +### Stream ZedToken updates using the SpiceDB Watch API + +We could minimize lookup time for ZedTokens by using the SpiceDB Watch API and streaming updates to all permissions-api replicas. However, this means updates will not be immediately visible for a given resource globally. While tokens could be stored in clients (i.e., using the IAM runtime), this merely pushes the complexity of managing ZedTokens to downstream services. + +### Store ZedTokens using NATS KV + +ZedTokens could be stored using KV in NATS as a coordination mechanism between servers globally. NATS clusters are in general meant to have very low latency between servers, where a single cluster coordinates a given stream (and thus a given KV bucket). This means that for immediate updates to resources using KV, all permissions-api replicas either need to read from a single NATS cluster (introducing latency as a function of distance from the cluster) or use a large cluster that spans many regions. The latter approach is [explicitly discouraged][nats-discussion] by NATS maintainers. + +[nats-discussion]: https://github.com/nats-io/nats-server/discussions/5317#discussioncomment-9138192 diff --git a/go.mod b/go.mod index 0e50ccbc..45c26730 100644 --- a/go.mod +++ b/go.mod @@ -7,10 +7,8 @@ require ( github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b github.com/cockroachdb/cockroach-go/v2 v2.3.7 github.com/go-jose/go-jose/v4 v4.0.1 - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jackc/pgx/v5 v5.5.5 github.com/labstack/echo/v4 v4.11.4 - github.com/nats-io/nats.go v1.34.1 github.com/pkg/errors v0.9.1 github.com/pressly/goose/v3 v3.19.2 github.com/spf13/cobra v1.8.0 @@ -68,6 +66,7 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/nats-io/jwt/v2 v2.5.5 // indirect github.com/nats-io/nats-server/v2 v2.10.12 // indirect + github.com/nats-io/nats.go v1.34.1 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 29c8f71a..e6ca63f6 100644 --- a/go.sum +++ b/go.sum @@ -126,6 +126,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= diff --git a/internal/query/relations.go b/internal/query/relations.go index ae807ab4..ed965c8a 100644 --- a/internal/query/relations.go +++ b/internal/query/relations.go @@ -113,7 +113,7 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc defer span.End() - consistency, consName := e.determineConsistency(resource) + consistency, consName := e.determineConsistency(ctx, resource) span.SetAttributes( attribute.String( "permissions.consistency", @@ -284,7 +284,7 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations } } - relUpdates := e.relationshipsToUpdates(rels) + relUpdates := e.relationshipsToUpdates(rels, pb.RelationshipUpdate_OPERATION_TOUCH) request := &pb.WriteRelationshipsRequest{ Updates: relUpdates, @@ -591,7 +591,7 @@ func (e *engine) roleResourceRelationshipsTouchDelete(roleResource, resource typ return rels } -func (e *engine) relationshipsToUpdates(rels []types.Relationship) []*pb.RelationshipUpdate { +func (e *engine) relationshipsToUpdates(rels []types.Relationship, operation pb.RelationshipUpdate_Operation) []*pb.RelationshipUpdate { relUpdates := make([]*pb.RelationshipUpdate, len(rels)) for i, rel := range rels { @@ -599,7 +599,7 @@ func (e *engine) relationshipsToUpdates(rels []types.Relationship) []*pb.Relatio resRef := resourceToSpiceDBRef(e.namespace, rel.Resource) relUpdates[i] = &pb.RelationshipUpdate{ - Operation: pb.RelationshipUpdate_OPERATION_TOUCH, + Operation: operation, Relationship: &pb.Relationship{ Resource: resRef, Relation: rel.Relation, @@ -677,65 +677,22 @@ func (e *engine) DeleteRelationships(ctx context.Context, relationships ...types return multierr.Combine(errors...) } - errors = []error{} - - var ( - complete []types.Relationship - dErr error - cErr error - ) - - span.AddEvent("deleting relationships") - - for i, relationship := range relationships { - resType := e.namespace + "/" + relationship.Resource.Type - subjType := e.namespace + "/" + relationship.Subject.Type - - filter := &pb.RelationshipFilter{ - ResourceType: resType, - OptionalResourceId: relationship.Resource.ID.String(), - OptionalRelation: relationship.Relation, - OptionalSubjectFilter: &pb.SubjectFilter{ - SubjectType: subjType, - OptionalSubjectId: relationship.Subject.ID.String(), - }, - } - - if dErr = e.deleteRelationships(ctx, filter); dErr != nil { - e.logger.Errorf("%w: failed to delete relationship %d reverting %d completed deletes", dErr, i, len(complete)) + relUpdates := e.relationshipsToUpdates(relationships, pb.RelationshipUpdate_OPERATION_DELETE) - err := fmt.Errorf("%w: failed to delete relationship %d", dErr, i) - - span.RecordError(err) - - errors = append(errors, err) - - break - } - - complete = append(complete, relationship) + request := &pb.WriteRelationshipsRequest{ + Updates: relUpdates, } - if len(errors) != 0 { - span.SetStatus(codes.Error, "error occurred deleting relationships") - - if len(complete) != 0 { - span.AddEvent("recreating deleted relationships") - - if cErr = e.CreateRelationships(ctx, complete); cErr != nil { - e.logger.Error("%w: failed to revert %d deleted relationships", cErr, len(complete)) - - err := fmt.Errorf("%w: failed to revert deleted relationships", cErr) - - span.RecordError(err) - - errors = append(errors, err) - } - } + resp, err := e.client.WriteRelationships(ctx, request) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) - return multierr.Combine(errors...) + return err } + e.updateRelationshipZedTokens(ctx, relationships, resp.WrittenAt.Token) + return nil } @@ -1266,7 +1223,12 @@ func (e *engine) rollbackUpdates(ctx context.Context, updates []*pb.Relationship }) } - return e.applyUpdates(ctx, rollbacks) + _, err := e.client.WriteRelationships(ctx, &pb.WriteRelationshipsRequest{Updates: rollbacks}) + if err != nil { + return err + } + + return nil } // applyUpdates is a wrapper function around the spiceDB WriteRelationships method diff --git a/internal/query/relations_test.go b/internal/query/relations_test.go index 24dfcff3..62860f04 100644 --- a/internal/query/relations_test.go +++ b/internal/query/relations_test.go @@ -6,12 +6,10 @@ import ( pb "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/authzed-go/v1" - "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.infratographer.com/x/gidx" - "go.infratographer.com/x/testing/eventtools" "go.infratographer.com/permissions-api/internal/iapl" "go.infratographer.com/permissions-api/internal/spicedbx" @@ -40,16 +38,6 @@ func testEngine(ctx context.Context, t *testing.T, namespace string, policy iapl _, err = client.WriteSchema(ctx, request) require.NoError(t, err) - natsSrv, err := eventtools.NewNatsServer() - require.NoError(t, err) - - kvCfg := nats.KeyValueConfig{ - Bucket: "zedtokens", - } - - kv, err := natsSrv.JetStream.CreateKeyValue(&kvCfg) - require.NoError(t, err) - t.Cleanup(func() { cleanDB(ctx, t, client, namespace, policy) cleanStore() @@ -57,13 +45,9 @@ func testEngine(ctx context.Context, t *testing.T, namespace string, policy iapl // We call the constructor here to ensure the engine is created appropriately, but // then return the underlying type so we can do testing with it. - out, err := NewEngine(namespace, client, kv, store, WithPolicy(policy)) + out, err := NewEngine(namespace, client, store, WithPolicy(policy)) require.NoError(t, err) - t.Cleanup(func() { - out.Stop() //nolint:errcheck - }) - return out.(*engine) } diff --git a/internal/query/rolebindings.go b/internal/query/rolebindings.go index 845319ef..80b00349 100644 --- a/internal/query/rolebindings.go +++ b/internal/query/rolebindings.go @@ -204,7 +204,7 @@ func (e *engine) CreateRoleBinding( updates = append(updates, subjUpdates...) - if err := e.applyUpdates(ctx, updates); err != nil { + if err := e.applyUpdates(dbCtx, updates); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) @@ -306,7 +306,7 @@ func (e *engine) DeleteRoleBinding(ctx context.Context, rb types.Resource) error } // apply changes - if err := e.applyUpdates(ctx, updates); err != nil { + if err := e.applyUpdates(dbCtx, updates); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) @@ -503,7 +503,7 @@ func (e *engine) UpdateRoleBinding(ctx context.Context, actor, rb types.Resource updates = append(updates, update) } - if err := e.applyUpdates(ctx, updates); err != nil { + if err := e.applyUpdates(dbCtx, updates); err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) diff --git a/internal/query/service.go b/internal/query/service.go index 4c6d647c..af600f98 100644 --- a/internal/query/service.go +++ b/internal/query/service.go @@ -4,8 +4,6 @@ import ( "context" "github.com/authzed/authzed-go/v1" - "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/nats-io/nats.go" "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" @@ -77,8 +75,6 @@ type Engine interface { GetRoleBindingResource(ctx context.Context, rb types.Resource) (types.Resource, error) AllActions() []string - - Stop() error } type engine struct { @@ -86,9 +82,6 @@ type engine struct { logger *zap.SugaredLogger namespace string client *authzed.Client - kv nats.KeyValue - keyWatcher nats.KeyWatcher - zedTokenCache *expirable.LRU[string, string] store storage.Storage schema []types.ResourceType schemaPrefixMap map[string]types.ResourceType @@ -166,14 +159,13 @@ func resourceHasRoleBindingV2(resType types.ResourceType) *types.ConditionRoleBi } // NewEngine returns a new client for making permissions queries. -func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, store storage.Storage, options ...Option) (Engine, error) { +func NewEngine(namespace string, client *authzed.Client, store storage.Storage, options ...Option) (Engine, error) { tracer := otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/internal/query") e := &engine{ logger: zap.NewNop().Sugar(), namespace: namespace, client: client, - kv: kv, store: store, tracer: tracer, } @@ -190,17 +182,9 @@ func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, store e.cacheSchemaResources() } - if err := e.initZedTokenCache(); err != nil { - return nil, err - } - return e, nil } -func (e *engine) Stop() error { - return e.keyWatcher.Stop() -} - // Option is a functional option for the engine type Option func(*engine) diff --git a/internal/query/zedtokens.go b/internal/query/zedtokens.go index e226dff6..58c79a14 100644 --- a/internal/query/zedtokens.go +++ b/internal/query/zedtokens.go @@ -2,16 +2,14 @@ package query import ( "context" - "errors" + + "go.infratographer.com/permissions-api/internal/types" pb "github.com/authzed/authzed-go/proto/authzed/api/v1" - "github.com/hashicorp/golang-lru/v2/expirable" - "github.com/nats-io/nats.go" + "go.infratographer.com/x/gidx" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" - - "go.infratographer.com/permissions-api/internal/types" ) const ( @@ -19,66 +17,6 @@ const ( consistencyAtLeastAsFresh = "at_least_as_fresh" ) -// initZedTokenCache creates a new LRU cache that watches KV for ZedToken updates. -func (e *engine) initZedTokenCache() error { - status, err := e.kv.Status() - if err != nil { - return err - } - - ttl := status.TTL() - - keyWatcher, err := e.kv.WatchAll() - if err != nil { - return err - } - - lru := expirable.NewLRU[string, string](0, nil, ttl) - - e.keyWatcher = keyWatcher - e.zedTokenCache = lru - - go func() { - for entry := range e.keyWatcher.Updates() { - if entry == nil { - continue - } - - key := entry.Key() - value := string(entry.Value()) - - _, span := e.tracer.Start( - context.Background(), - "populateZedTokenCache", - trace.WithAttributes( - attribute.String( - "permissions.resource", - key, - ), - ), - ) - - e.zedTokenCache.Add(key, value) - - span.End() - } - }() - - return nil -} - -// getLatestZedToken attempts to get the latest ZedToken for the given resource ID. -func (e *engine) getLatestZedToken(resourceID string) (string, bool) { - resp, ok := e.zedTokenCache.Get(resourceID) - if !ok { - return "", false - } - - zedToken := string(resp) - - return zedToken, true -} - // upsertZedToken updates the ZedToken at the given resource ID key with the provided ZedToken. func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken string) error { _, span := e.tracer.Start( @@ -94,25 +32,13 @@ func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken defer span.End() - zedTokenBytes := []byte(zedToken) - - // Attempt to get a ZedToken. If we found one, update it. If not, create it. If some other error - // happened, log that and return - resp, getErr := e.kv.Get(resourceID) - - var err error - - switch { - // If we found a token, update it. This may fail if another client updated it before we did - case getErr == nil: - _, err = e.kv.Update(resourceID, zedTokenBytes, resp.Revision()) - // If we did not find a token, create it. This may fail if another client created an entry already - case errors.Is(getErr, nats.ErrKeyNotFound): - _, err = e.kv.Create(resourceID, zedTokenBytes) - // If something else happened, just keep moving - default: + prefixedID, err := gidx.Parse(resourceID) + if err != nil { + return err } + err = e.store.UpsertZedToken(ctx, prefixedID, zedToken) + // If an error happened when creating or updating the token, record it. if err != nil { span.RecordError(err) @@ -124,8 +50,11 @@ func (e *engine) upsertZedToken(ctx context.Context, resourceID string, zedToken return nil } -// updateRelationshipZedTokens updates the NATS KV bucket for ZedTokens, setting the given ZedToken +// updateRelationshipZedTokens updates the CRDB table for ZedTokens, setting the given ZedToken // as the latest point in time snapshot for every resource in the given list of relationships. +// +// This function updates the table using an out of band transaction, as if it fails we do not want +// to roll back the entire outer transaction. func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.Relationship, zedToken string) { resourceIDMap := map[string]struct{}{} for _, rel := range rels { @@ -133,11 +62,39 @@ func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.R resourceIDMap[rel.Subject.ID.String()] = struct{}{} } + ctx, span := e.tracer.Start( + ctx, + "updateRelationshipZedTokens", + ) + + defer span.End() + + dbCtx, err := e.store.BeginContext(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return + } + for resourceID := range resourceIDMap { - if err := e.upsertZedToken(ctx, resourceID, zedToken); err != nil { + if err := e.upsertZedToken(dbCtx, resourceID, zedToken); err != nil { e.logger.Warnw("error upserting ZedToken", "error", err.Error(), "resource_id", resourceID) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) + + return } } + + if err = e.store.CommitContext(dbCtx); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + logRollbackErr(e.logger, e.store.RollbackContext(dbCtx)) + } } // determineConsistency produces a consistency strategy based on whether a ZedToken exists for a @@ -145,27 +102,48 @@ func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.R // retrieved ZedToken. If no such token is found, minimize_latency is used. This ensures that if // NATS is not working or available for some reason, we can still make permissions checks (albeit // in a degraded state). -func (e *engine) determineConsistency(resource types.Resource) (*pb.Consistency, string) { - resourceID := resource.ID.String() - - zedToken, ok := e.getLatestZedToken(resourceID) - if !ok { - consistency := &pb.Consistency{ - Requirement: &pb.Consistency_MinimizeLatency{ - MinimizeLatency: true, - }, - } +func (e *engine) determineConsistency(ctx context.Context, resource types.Resource) (*pb.Consistency, string) { + resourceID := resource.ID - return consistency, consistencyMinimizeLatency - } + _, span := e.tracer.Start( + ctx, + "determineConsistency", + trace.WithAttributes( + attribute.Stringer( + "permissions.resource", + resourceID, + ), + ), + ) + + defer span.End() consistency := &pb.Consistency{ - Requirement: &pb.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: &pb.ZedToken{ - Token: zedToken, - }, + Requirement: &pb.Consistency_MinimizeLatency{ + MinimizeLatency: true, }, } - return consistency, consistencyAtLeastAsFresh + consistencyName := consistencyMinimizeLatency + + zedToken, err := e.store.GetLatestZedToken(ctx, resourceID) + + switch { + case err != nil: + e.logger.Warnw("error getting ZedToken", "error", err.Error()) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + case zedToken != "": + consistency = &pb.Consistency{ + Requirement: &pb.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: &pb.ZedToken{ + Token: zedToken, + }, + }, + } + + consistencyName = consistencyAtLeastAsFresh + } + + return consistency, consistencyName } diff --git a/internal/query/zedtokens_test.go b/internal/query/zedtokens_test.go index 52935c22..e61ebf69 100644 --- a/internal/query/zedtokens_test.go +++ b/internal/query/zedtokens_test.go @@ -7,7 +7,6 @@ import ( "go.infratographer.com/permissions-api/internal/testingx" "go.infratographer.com/permissions-api/internal/types" - "github.com/nats-io/nats.go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.infratographer.com/x/gidx" @@ -46,19 +45,10 @@ func TestConsistency(t *testing.T) { }, } - // Watch for updates to the key to avoid racing - kw, err := e.kv.Watch(tenantID.String(), nats.UpdatesOnly()) - require.NoError(t, err) - - defer kw.Stop() //nolint:errcheck - err = e.CreateRelationships(ctx, rels) require.NoError(t, err) - // Wait until we know an update occurred - <-kw.Updates() - return ctx }, CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[string]) { @@ -77,7 +67,7 @@ func TestConsistency(t *testing.T) { } testFn := func(ctx context.Context, res types.Resource) testingx.TestResult[string] { - _, consistencyName := e.determineConsistency(res) + _, consistencyName := e.determineConsistency(ctx, res) out := testingx.TestResult[string]{ Success: consistencyName, diff --git a/internal/storage/migrations/20240515000000_zedtokens.sql b/internal/storage/migrations/20240515000000_zedtokens.sql new file mode 100644 index 00000000..de5ee9bc --- /dev/null +++ b/internal/storage/migrations/20240515000000_zedtokens.sql @@ -0,0 +1,15 @@ +-- +goose Up + +-- create "zedtokens" table +CREATE TABLE "zedtokens" ( + "resource_id" character varying NOT NULL, + "zedtoken" character varying NOT NULL, + "created_at" timestamptz NOT NULL, + "expires_at" timestamptz NOT NULL, + PRIMARY KEY ("resource_id") +) WITH (ttl_expiration_expression = 'expires_at', ttl_job_cron = '0 */4 * * *'); + +-- +goose Down + +-- drop "zedtokens" table +DROP TABLE "zedtokens"; diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 95b60dc9..11c7bbd0 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -12,6 +12,7 @@ import ( type Storage interface { RoleService RoleBindingService + ZedTokenService TransactionManager HealthCheck(ctx context.Context) error diff --git a/internal/storage/zedtokens.go b/internal/storage/zedtokens.go new file mode 100644 index 00000000..12d2260c --- /dev/null +++ b/internal/storage/zedtokens.go @@ -0,0 +1,63 @@ +package storage + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "go.infratographer.com/x/gidx" +) + +// ZedTokenService represents a service for getting and updating ZedTokens for resources. +type ZedTokenService interface { + GetLatestZedToken(ctx context.Context, ids ...gidx.PrefixedID) (string, error) + UpsertZedToken(ctx context.Context, id gidx.PrefixedID, zedToken string) error +} + +func (e *engine) GetLatestZedToken(ctx context.Context, ids ...gidx.PrefixedID) (string, error) { + db, err := getContextDBQuery(ctx, e) + if err != nil { + return "", err + } + + inClause, args := e.buildBatchInClauseWithIDs(ids) + q := fmt.Sprintf(` + SELECT zedtoken + FROM zedtokens + WHERE resource_id IN (%s) + AND current_timestamp() < expires_at + ORDER BY created_at DESC + LIMIT 1 + `, inClause) + + var out string + + err = db.QueryRowContext(ctx, q, args...).Scan(&out) + + switch { + case err == nil: + return out, nil + case errors.Is(err, sql.ErrNoRows): + return "", nil + default: + return "", err + } +} + +func (e *engine) UpsertZedToken(ctx context.Context, id gidx.PrefixedID, zedToken string) error { + tx, err := getContextTx(ctx) + if err != nil { + return err + } + + queryStub := ` + UPSERT INTO zedtokens (resource_id, zedtoken, created_at, expires_at) + VALUES ($1, $2, current_timestamp(), current_timestamp() + (INTERVAL '1 hour')) + ` + if _, err := tx.ExecContext(ctx, queryStub, id, zedToken); err != nil { + return err + } + + return nil +}