Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add design proposal and implementation for ZedTokens table #257

Merged
merged 5 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions cmd/createrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.infratographer.com/x/crdbx"
"go.infratographer.com/x/events"
"go.infratographer.com/x/gidx"
"go.infratographer.com/x/viperx"

Expand Down Expand Up @@ -65,16 +64,6 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("unable to initialize spicedb client", "error", err)
}

eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled)
if err != nil {
logger.Fatalw("unable to initialize permissions-api database", "error", err)
Expand Down Expand Up @@ -109,17 +98,11 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("error parsing subject ID", "error", err)
}

engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy), query.WithLogger(logger))
engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy), query.WithLogger(logger))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

defer func() {
if err := engine.Stop(); err != nil {
logger.Errorw("error stopping engine", "error", err)
}
}()

resource, err := engine.NewResourceFromID(resourceID)
if err != nil {
logger.Fatalw("error creating resource", "error", err)
Expand Down
30 changes: 0 additions & 30 deletions cmd/kv.go

This file was deleted.

20 changes: 1 addition & 19 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"go.infratographer.com/x/crdbx"
"go.infratographer.com/x/echojwtx"
"go.infratographer.com/x/echox"
"go.infratographer.com/x/events"
"go.infratographer.com/x/otelx"
"go.infratographer.com/x/versionx"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,7 +38,6 @@ func init() {
echox.MustViperFlags(v, serverCmd.Flags(), apiDefaultListen)
otelx.MustViperFlags(v, serverCmd.Flags())
echojwtx.MustViperFlags(v, serverCmd.Flags())
events.MustViperFlags(v, serverCmd.Flags(), appName)
}

func serve(_ context.Context, cfg *config.AppConfig) {
Expand All @@ -53,16 +51,6 @@ func serve(_ context.Context, cfg *config.AppConfig) {
logger.Fatalw("unable to initialize spicedb client", "error", err)
}

eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled)
if err != nil {
logger.Fatalw("unable to initialize permissions-api database", "error", err)
Expand All @@ -87,17 +75,11 @@ func serve(_ context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy), query.WithLogger(logger))
engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy), query.WithLogger(logger))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

defer func() {
if err := engine.Stop(); err != nil {
logger.Errorw("error stopping engine", "error", err)
}
}()

srv, err := echox.NewServer(
logger.Desugar(),
echox.ConfigFromViper(viper.GetViper()),
Expand Down
7 changes: 1 addition & 6 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

db, err := crdbx.NewDB(cfg.CRDB, cfg.Tracing.Enabled)
if err != nil {
logger.Fatalw("unable to initialize permissions-api database", "error", err)
Expand All @@ -88,7 +83,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine, err := query.NewEngine("infratographer", spiceClient, kv, store, query.WithPolicy(policy))
engine, err := query.NewEngine("infratographer", spiceClient, store, query.WithPolicy(policy))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}
Expand Down
46 changes: 46 additions & 0 deletions docs/20240515-zedtokens.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Consistency with ZedTokens

## Overview

In SpiceDB, resources are arranged in a graph where different services may be contributing to that graph. For example, permissions-api itself manages roles and role bindings, while tenant-api manages tenants. Resources are updated over NATS request/reply.

SpiceDB breaks updates down into quantized revisions of configurable duration, meaning ACL updates within a given revision window are not visible by default until the next revision begins. Using ZedTokens, however, clients can request data at least as fresh as a given exact point in time. This allows for clients to control the consistency they need for a given SpiceDB request on a per-request basis.

In general, services expect that immediately after updating relationships in permissions-api, they can make authorization decisions based on data at least as fresh as when they made those updates.

## Goals

This proposal is meant to achieve the following goals:

* Permissions checks made on a resource after it is updated must use data at least as fresh as the last update to that resource

## Non-goals

This proposal is not meant to achieve the following goals:

* Updates to the graph are globally and immediately propagated

## Proposed solution

To mitigate this issue, permissions-api can be updated to use a table in CRDB to populate ZedTokens for recently updated resources. By doing this, permissions-api becomes responsible for determining the freshness of lookups. On permissions checks for a given resource, the following consistency strategy is used:

* If a ZedToken exists for that resource, use `at_least_as_fresh` with the given ZedToken
* If no ZedToken exists, use `minimize_latency`

The advantage of this approach is that it keeps management of consistency within permissions-api, meaning future changes do not (necessarily) require updates to other services, nor does it change current API semantics. Additionally, it does not introduce new dependencies to permissions-api, instead leveraging CRDB's availability and fault tolerance guarantees.

## Constraints and limitations

As designed, this only provides immediately visible updates for a given resource; other indirectly affected resources are not updated (but could be in the future). Additionally, given CRDB leaseholder behavior, multi-region deployments may see longer durations when performing permissions checks if the ZedToken table is not a global table.

## Alternatives considered
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still trying to understand this full stack, so excuse my ignorance. It seems like another alternative is no ZedTokens at all. Reading this post seems to indicate that this won't happen in CRDB. Also it seems even less likely because of our prefixing.

Am I missing something in my reading of that post and our usage of SpiceDB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that blog post is primarily about controlling write ordering in CRDB, not ACL updates. There are a few strategies we can adopt if the current default of using a single key for ordering transactions becomes a problem, but we haven't run into scaling issues there.

Where things get more interesting is around ZedTokens. The use of ZedTokens isn't really specific to CRDB, but rather inherent to how Zanzibar and SpiceDB operate. SpiceDB revisions (known as snapshots in the Zanzibar paper) are generally some number of seconds long and reads happen within a given revision. All datastores, including CRDB, use revisions for reading data.

ZedTokens allow for the client to request data newer than whatever a given SpiceDB replica might have available in its cache or whatever the current revision it's using is. In a CRDB deployment environment that uses follower reads, this would mean requesting data that might be newer than what the nearest replica has, thus incurring a one-time round trip to the leaseholder coordinating work. Following this, the data are cached in memory in SpiceDB for the remainder of the revision duration. Other storage engines don't have the same horizontal/multi-region scaling capability that CRDB offers.

Not using ZedTokens would require either waiting for a new revision to elapse before ACL updates are visible or leveraging full consistency in permissions checks, which means we get none of the caching benefits SpiceDB provides.


### Stream ZedToken updates using the SpiceDB Watch API

We could minimize lookup time for ZedTokens by using the SpiceDB Watch API and streaming updates to all permissions-api replicas. However, this means updates will not be immediately visible for a given resource globally. While tokens could be stored in clients (i.e., using the IAM runtime), this merely pushes the complexity of managing ZedTokens to downstream services.

### Store ZedTokens using NATS KV

ZedTokens could be stored using KV in NATS as a coordination mechanism between servers globally. NATS clusters are in general meant to have very low latency between servers, where a single cluster coordinates a given stream (and thus a given KV bucket). This means that for immediate updates to resources using KV, all permissions-api replicas either need to read from a single NATS cluster (introducing latency as a function of distance from the cluster) or use a large cluster that spans many regions. The latter approach is [explicitly discouraged][nats-discussion] by NATS maintainers.

[nats-discussion]: https://github.com/nats-io/nats-server/discussions/5317#discussioncomment-9138192
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ require (
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
github.com/cockroachdb/cockroach-go/v2 v2.3.7
github.com/go-jose/go-jose/v4 v4.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jackc/pgx/v5 v5.5.5
github.com/labstack/echo/v4 v4.11.4
github.com/nats-io/nats.go v1.34.1
github.com/pkg/errors v0.9.1
github.com/pressly/goose/v3 v3.19.2
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -68,6 +66,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/nats-io/nats-server/v2 v2.10.12 // indirect
github.com/nats-io/nats.go v1.34.1 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.2.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
Expand Down
78 changes: 20 additions & 58 deletions internal/query/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc

defer span.End()

consistency, consName := e.determineConsistency(resource)
consistency, consName := e.determineConsistency(ctx, resource)
span.SetAttributes(
attribute.String(
"permissions.consistency",
Expand Down Expand Up @@ -284,7 +284,7 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations
}
}

relUpdates := e.relationshipsToUpdates(rels)
relUpdates := e.relationshipsToUpdates(rels, pb.RelationshipUpdate_OPERATION_TOUCH)

request := &pb.WriteRelationshipsRequest{
Updates: relUpdates,
Expand Down Expand Up @@ -591,15 +591,15 @@ func (e *engine) roleResourceRelationshipsTouchDelete(roleResource, resource typ
return rels
}

func (e *engine) relationshipsToUpdates(rels []types.Relationship) []*pb.RelationshipUpdate {
func (e *engine) relationshipsToUpdates(rels []types.Relationship, operation pb.RelationshipUpdate_Operation) []*pb.RelationshipUpdate {
relUpdates := make([]*pb.RelationshipUpdate, len(rels))

for i, rel := range rels {
subjRef := resourceToSpiceDBRef(e.namespace, rel.Subject)
resRef := resourceToSpiceDBRef(e.namespace, rel.Resource)

relUpdates[i] = &pb.RelationshipUpdate{
Operation: pb.RelationshipUpdate_OPERATION_TOUCH,
Operation: operation,
Relationship: &pb.Relationship{
Resource: resRef,
Relation: rel.Relation,
Expand Down Expand Up @@ -677,65 +677,22 @@ func (e *engine) DeleteRelationships(ctx context.Context, relationships ...types
return multierr.Combine(errors...)
}

errors = []error{}

var (
complete []types.Relationship
dErr error
cErr error
)

span.AddEvent("deleting relationships")

for i, relationship := range relationships {
resType := e.namespace + "/" + relationship.Resource.Type
subjType := e.namespace + "/" + relationship.Subject.Type

filter := &pb.RelationshipFilter{
ResourceType: resType,
OptionalResourceId: relationship.Resource.ID.String(),
OptionalRelation: relationship.Relation,
OptionalSubjectFilter: &pb.SubjectFilter{
SubjectType: subjType,
OptionalSubjectId: relationship.Subject.ID.String(),
},
}

if dErr = e.deleteRelationships(ctx, filter); dErr != nil {
e.logger.Errorf("%w: failed to delete relationship %d reverting %d completed deletes", dErr, i, len(complete))
relUpdates := e.relationshipsToUpdates(relationships, pb.RelationshipUpdate_OPERATION_DELETE)

err := fmt.Errorf("%w: failed to delete relationship %d", dErr, i)

span.RecordError(err)

errors = append(errors, err)

break
}

complete = append(complete, relationship)
request := &pb.WriteRelationshipsRequest{
Updates: relUpdates,
}

if len(errors) != 0 {
span.SetStatus(codes.Error, "error occurred deleting relationships")

if len(complete) != 0 {
span.AddEvent("recreating deleted relationships")

if cErr = e.CreateRelationships(ctx, complete); cErr != nil {
e.logger.Error("%w: failed to revert %d deleted relationships", cErr, len(complete))

err := fmt.Errorf("%w: failed to revert deleted relationships", cErr)

span.RecordError(err)

errors = append(errors, err)
}
}
resp, err := e.client.WriteRelationships(ctx, request)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return multierr.Combine(errors...)
return err
}

e.updateRelationshipZedTokens(ctx, relationships, resp.WrittenAt.Token)

return nil
}

Expand Down Expand Up @@ -1266,7 +1223,12 @@ func (e *engine) rollbackUpdates(ctx context.Context, updates []*pb.Relationship
})
}

return e.applyUpdates(ctx, rollbacks)
_, err := e.client.WriteRelationships(ctx, &pb.WriteRelationshipsRequest{Updates: rollbacks})
if err != nil {
return err
}

return nil
}

// applyUpdates is a wrapper function around the spiceDB WriteRelationships method
Expand Down
18 changes: 1 addition & 17 deletions internal/query/relations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (

pb "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/authzed-go/v1"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.infratographer.com/x/gidx"
"go.infratographer.com/x/testing/eventtools"

"go.infratographer.com/permissions-api/internal/iapl"
"go.infratographer.com/permissions-api/internal/spicedbx"
Expand Down Expand Up @@ -40,30 +38,16 @@ func testEngine(ctx context.Context, t *testing.T, namespace string, policy iapl
_, err = client.WriteSchema(ctx, request)
require.NoError(t, err)

natsSrv, err := eventtools.NewNatsServer()
require.NoError(t, err)

kvCfg := nats.KeyValueConfig{
Bucket: "zedtokens",
}

kv, err := natsSrv.JetStream.CreateKeyValue(&kvCfg)
require.NoError(t, err)

t.Cleanup(func() {
cleanDB(ctx, t, client, namespace, policy)
cleanStore()
})

// We call the constructor here to ensure the engine is created appropriately, but
// then return the underlying type so we can do testing with it.
out, err := NewEngine(namespace, client, kv, store, WithPolicy(policy))
out, err := NewEngine(namespace, client, store, WithPolicy(policy))
require.NoError(t, err)

t.Cleanup(func() {
out.Stop() //nolint:errcheck
})

return out.(*engine)
}

Expand Down
Loading
Loading