Skip to content

Commit

Permalink
Cache ZedTokens for resources using NATS (#209)
Browse files Browse the repository at this point in the history
* Cache ZedTokens for resources using NATS

Using full consistency when doing permissions checks is slow. In
general, this is addressed by using ZedTokens to indicate minimum
bounds on freshness when looking up cached data. Something has to keep
track of those tokens, either on the client side or server-side.

This commit introduces worker caching of ZedTokens for resources on
updates to relationships and updates the query engine to use those
tokens when performing permissions checks. When a worker updates a
relationship, it persists the ZedToken for all resources directly
affected by that update to a NATS KV bucket. NATS KV writes are
immediately consistent, so the new ZedToken for that resource is
available to all consumers, including permissions-api API
frontends. When the query engine performs a permissions check, it
checks to see if a ZedToken is available for the resource.

If a ZedToken was found, that ZedToken is used along with the
at_least_as_fresh SpiceDB API consistency strategy. If not, or if
there was an error accessing NATS, the query engine falls back to the
minimize_latency API consistency strategy. If the NATS KV bucket is
configured with a TTL at least as high as the quantization interval
for SpiceDB, this ensures that by the time the ZedToken is evicted
from the cache, all SpiceDB frontends will be updated with data at
least as fresh as the last relationship update for a resource. Clients
that wish to force an update for a resource (e.g., making role changes
immediately available to tenant users) can thus issue a relationship
update to permissions-api and get the latest data for that resource.

This commit assumes that the KV bucket used already exists;
permissions-api will not attempt to create it. This is because the
intention is that the KV bucket has a TTL set to something close to
the SpiceDB quantization interval, which permissions-api is not
necessarily aware of.

Signed-off-by: John Schaeffer <[email protected]>

* Update Helm chart to support ZedToken cache

This commit adds the necessary configs to the Helm chart to support
populating a ZedToken cache for permissions-api.

Signed-off-by: John Schaeffer <[email protected]>

* Add NATS creds to server deployment

This commit adds NATS creds to the server deployment in the Helm
chart.

Signed-off-by: John Schaeffer <[email protected]>

* Add tests for determining consistency

This commit adds tests for determineConsistency.

Signed-off-by: John Schaeffer <[email protected]>

---------

Signed-off-by: John Schaeffer <[email protected]>
  • Loading branch information
jnschaeffer authored Jan 16, 2024
1 parent 74bed8f commit 1d6d177
Show file tree
Hide file tree
Showing 15 changed files with 382 additions and 24 deletions.
9 changes: 9 additions & 0 deletions chart/permissions-api/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
secret:
secretName: {{ . }}
{{- end }}
{{- with .Values.config.events.nats.credsSecretName }}
- name: nats-creds
secret:
secretName: {{ . }}
{{- end }}
{{- with .Values.config.spicedb.policyConfigMapName }}
- name: policy-file
configMap:
Expand All @@ -27,6 +32,10 @@
- name: spicedb-ca
mountPath: /etc/ssl/spicedb/
{{- end }}
{{- if .Values.config.events.nats.credsSecretName }}
- name: nats-creds
mountPath: /nats
{{- end }}
{{- if .Values.config.spicedb.policyConfigMapName }}
- name: policy-file
mountPath: /policy
Expand Down
2 changes: 1 addition & 1 deletion chart/permissions-api/templates/config-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ metadata:
service: server
data:
config.yaml: |
{{- pick .Values.config "server" "oidc" "spicedb" "tracing" | toYaml | nindent 4 }}
{{- pick .Values.config "server" "oidc" "spicedb" "tracing" "events" | toYaml | nindent 4 }}
3 changes: 3 additions & 0 deletions chart/permissions-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ config:
policyConfigMapName: ""

events:
# zedTokenBucket is the NATS bucket to use for caching ZedTokens
zedTokenBucket: ""

# topics are the list of topics to subscribe to
topics: []

Expand Down
16 changes: 15 additions & 1 deletion cmd/createrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.infratographer.com/x/events"
"go.infratographer.com/x/gidx"
"go.infratographer.com/x/viperx"

Expand Down Expand Up @@ -59,6 +60,16 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("unable to initialize spicedb client", "error", err)
}

eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

var policy iapl.Policy

if cfg.SpiceDB.PolicyFile != "" {
Expand Down Expand Up @@ -86,7 +97,10 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("error parsing subject ID", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger))
engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy), query.WithLogger(logger))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

resource, err := engine.NewResourceFromID(resourceID)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions cmd/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cmd

import (
"errors"

"github.com/nats-io/nats.go"
"go.infratographer.com/x/events"

"go.infratographer.com/permissions-api/internal/config"
)

var (
errInvalidSource = errors.New("events source must be a NATS connection")
)

func initializeKV(cfg config.EventsConfig, eventsConn events.Connection) (nats.KeyValue, error) {
// While in theory the events package supports any kind of broker, in practice we only
// support NATS.
natsConn, ok := eventsConn.Source().(*nats.Conn)
if !ok {
return nil, errInvalidSource
}

js, err := natsConn.JetStream()
if err != nil {
return nil, err
}

return js.KeyValue(cfg.ZedTokenBucket)
}
17 changes: 16 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/spf13/viper"
"go.infratographer.com/x/echojwtx"
"go.infratographer.com/x/echox"
"go.infratographer.com/x/events"
"go.infratographer.com/x/otelx"
"go.infratographer.com/x/versionx"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,6 +39,7 @@ func init() {
echox.MustViperFlags(v, serverCmd.Flags(), apiDefaultListen)
otelx.MustViperFlags(v, serverCmd.Flags())
echojwtx.MustViperFlags(v, serverCmd.Flags())
events.MustViperFlags(v, serverCmd.Flags(), appName)
}

func serve(ctx context.Context, cfg *config.AppConfig) {
Expand All @@ -51,6 +53,16 @@ func serve(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("unable to initialize spicedb client", "error", err)
}

eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

var policy iapl.Policy

if cfg.SpiceDB.PolicyFile != "" {
Expand All @@ -68,7 +80,10 @@ func serve(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy))
engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

srv, err := echox.NewServer(
logger.Desugar(),
Expand Down
18 changes: 13 additions & 5 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,22 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger))

events, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

subscriber, err := pubsub.NewSubscriber(ctx, events, engine,
kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy), query.WithLogger(logger))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

subscriber, err := pubsub.NewSubscriber(ctx, eventsConn, engine,
pubsub.WithLogger(logger),
)
if err != nil {
Expand Down Expand Up @@ -140,7 +148,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) {

defer cancel()

if err := events.Shutdown(ctx); err != nil {
if err := eventsConn.Shutdown(ctx); err != nil {
logger.Fatalw("failed to shutdown events gracefully", "error", "err")
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/authzed/authzed-go v0.10.1
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403
github.com/labstack/echo/v4 v4.11.3
github.com/nats-io/nats.go v1.31.0
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.10.4 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
Expand Down
8 changes: 6 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (

// EventsConfig stores the configuration for a load-balancer-api events config
type EventsConfig struct {
events.Config `mapstructure:",squash"`
Topics []string
events.Config `mapstructure:",squash"`
Topics []string
ZedTokenBucket string
}

// AppConfig is the struct used for configuring the app
Expand All @@ -34,4 +35,7 @@ type AppConfig struct {
func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet) {
flags.StringSlice("events-topics", []string{}, "event topics to subscribe to")
viperx.MustBindFlag(v, "events.topics", flags.Lookup("events-topics"))

flags.String("events-zedtokenbucket", "", "NATS KV bucket to use for caching ZedTokens")
viperx.MustBindFlag(v, "events.zedtokenbucket", flags.Lookup("events-zedtokenbucket"))
}
23 changes: 15 additions & 8 deletions internal/query/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc

defer span.End()

consistency, consName := e.determineConsistency(ctx, resource)
span.SetAttributes(
attribute.String(
"permissions.consistency",
consName,
),
)

req := &pb.CheckPermissionRequest{
Consistency: &pb.Consistency{
Requirement: &pb.Consistency_FullyConsistent{
FullyConsistent: true,
},
},
Resource: resourceToSpiceDBRef(e.namespace, resource),
Permission: action,
Consistency: consistency,
Resource: resourceToSpiceDBRef(e.namespace, resource),
Permission: action,
Subject: &pb.SubjectReference{
Object: resourceToSpiceDBRef(e.namespace, subject),
},
Expand Down Expand Up @@ -256,13 +260,16 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations
Updates: relUpdates,
}

if _, err := e.client.WriteRelationships(ctx, request); err != nil {
resp, err := e.client.WriteRelationships(ctx, request)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

e.updateRelationshipZedTokens(ctx, rels, resp.WrittenAt.Token)

return nil
}

Expand Down
42 changes: 39 additions & 3 deletions internal/query/relations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (

pb "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/authzed-go/v1"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.infratographer.com/x/gidx"
"go.infratographer.com/x/testing/eventtools"

"go.infratographer.com/permissions-api/internal/iapl"
"go.infratographer.com/permissions-api/internal/spicedbx"
"go.infratographer.com/permissions-api/internal/testingx"
"go.infratographer.com/permissions-api/internal/types"
)

func testEngine(ctx context.Context, t *testing.T, namespace string) Engine {
func testEngine(ctx context.Context, t *testing.T, namespace string) *engine {
config := spicedbx.Config{
Endpoint: "spicedb:50051",
Key: "infradev",
Expand All @@ -36,13 +38,26 @@ func testEngine(ctx context.Context, t *testing.T, namespace string) Engine {
_, err = client.WriteSchema(ctx, request)
require.NoError(t, err)

natsSrv, err := eventtools.NewNatsServer()
require.NoError(t, err)

kvCfg := nats.KeyValueConfig{
Bucket: "zedtokens",
}

kv, err := natsSrv.JetStream.CreateKeyValue(&kvCfg)
require.NoError(t, err)

t.Cleanup(func() {
cleanDB(ctx, t, client, namespace)
})

out := NewEngine(namespace, client, WithPolicy(policy))
// We call the constructor here to ensure the engine is created appropriately, but
// then return the underlying type so we can do testing with it.
out, err := NewEngine(namespace, client, kv, WithPolicy(policy))
require.NoError(t, err)

return out
return out.(*engine)
}

func testPolicy() iapl.Policy {
Expand Down Expand Up @@ -557,6 +572,10 @@ func TestSubjectActions(t *testing.T) {
ctx := context.Background()
e := testEngine(ctx, t, namespace)

parentID, err := gidx.NewID("tnntten")
require.NoError(t, err)
parentRes, err := e.NewResourceFromID(parentID)
require.NoError(t, err)
tenID, err := gidx.NewID("tnntten")
require.NoError(t, err)
tenRes, err := e.NewResourceFromID(tenID)
Expand All @@ -580,6 +599,23 @@ func TestSubjectActions(t *testing.T) {
err = e.AssignSubjectRole(ctx, subjRes, role)
assert.NoError(t, err)

// Force a ZedToken to be created for the relevant resources. This creates a hierarchy where
// the tenRes tenant and otherRes tenant are both child resources of the parentRes tenant.
rels := []types.Relationship{
{
Resource: tenRes,
Relation: "parent",
Subject: parentRes,
},
{
Resource: otherRes,
Relation: "parent",
Subject: parentRes,
},
}
err = e.CreateRelationships(ctx, rels)
require.NoError(t, err)

type testInput struct {
resource types.Resource
action string
Expand Down
7 changes: 5 additions & 2 deletions internal/query/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/authzed/authzed-go/v1"
"github.com/nats-io/nats.go"
"go.infratographer.com/x/gidx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -43,6 +44,7 @@ type engine struct {
logger *zap.SugaredLogger
namespace string
client *authzed.Client
kv nats.KeyValue
schema []types.ResourceType
schemaPrefixMap map[string]types.ResourceType
schemaTypeMap map[string]types.ResourceType
Expand Down Expand Up @@ -89,13 +91,14 @@ func resourceHasRoleBindings(resType types.ResourceType) bool {
}

// NewEngine returns a new client for making permissions queries.
func NewEngine(namespace string, client *authzed.Client, options ...Option) Engine {
func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, options ...Option) (Engine, error) {
tracer := otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/internal/query")

e := &engine{
logger: zap.NewNop().Sugar(),
namespace: namespace,
client: client,
kv: kv,
tracer: tracer,
}

Expand All @@ -109,7 +112,7 @@ func NewEngine(namespace string, client *authzed.Client, options ...Option) Engi
e.cacheSchemaResources()
}

return e
return e, nil
}

// Option is a functional option for the engine
Expand Down
Loading

0 comments on commit 1d6d177

Please sign in to comment.