Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache ZedTokens for resources using NATS #209

Merged
merged 4 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions chart/permissions-api/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
secret:
secretName: {{ . }}
{{- end }}
{{- with .Values.config.events.nats.credsSecretName }}
- name: nats-creds
secret:
secretName: {{ . }}
{{- end }}
{{- with .Values.config.spicedb.policyConfigMapName }}
- name: policy-file
configMap:
Expand All @@ -27,6 +32,10 @@
- name: spicedb-ca
mountPath: /etc/ssl/spicedb/
{{- end }}
{{- if .Values.config.events.nats.credsSecretName }}
- name: nats-creds
mountPath: /nats
{{- end }}
{{- if .Values.config.spicedb.policyConfigMapName }}
- name: policy-file
mountPath: /policy
Expand Down
2 changes: 1 addition & 1 deletion chart/permissions-api/templates/config-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ metadata:
service: server
data:
config.yaml: |
{{- pick .Values.config "server" "oidc" "spicedb" "tracing" | toYaml | nindent 4 }}
{{- pick .Values.config "server" "oidc" "spicedb" "tracing" "events" | toYaml | nindent 4 }}
3 changes: 3 additions & 0 deletions chart/permissions-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ config:
policyConfigMapName: ""

events:
# zedTokenBucket is the NATS bucket to use for caching ZedTokens
zedTokenBucket: ""

# topics are the list of topics to subscribe to
topics: []

Expand Down
16 changes: 15 additions & 1 deletion cmd/createrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.infratographer.com/x/events"
"go.infratographer.com/x/gidx"
"go.infratographer.com/x/viperx"

Expand Down Expand Up @@ -59,6 +60,16 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("unable to initialize spicedb client", "error", err)
}

eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

var policy iapl.Policy

if cfg.SpiceDB.PolicyFile != "" {
Expand Down Expand Up @@ -86,7 +97,10 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("error parsing subject ID", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger))
engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy), query.WithLogger(logger))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

resource, err := engine.NewResourceFromID(resourceID)
if err != nil {
Expand Down
30 changes: 30 additions & 0 deletions cmd/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cmd

import (
"errors"

"github.com/nats-io/nats.go"
"go.infratographer.com/x/events"

"go.infratographer.com/permissions-api/internal/config"
)

var (
errInvalidSource = errors.New("events source must be a NATS connection")
)

func initializeKV(cfg config.EventsConfig, eventsConn events.Connection) (nats.KeyValue, error) {
// While in theory the events package supports any kind of broker, in practice we only
// support NATS.
natsConn, ok := eventsConn.Source().(*nats.Conn)
if !ok {
return nil, errInvalidSource
}

js, err := natsConn.JetStream()
if err != nil {
return nil, err
}

return js.KeyValue(cfg.ZedTokenBucket)
}
17 changes: 16 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/spf13/viper"
"go.infratographer.com/x/echojwtx"
"go.infratographer.com/x/echox"
"go.infratographer.com/x/events"
"go.infratographer.com/x/otelx"
"go.infratographer.com/x/versionx"
"go.uber.org/zap"
Expand Down Expand Up @@ -38,6 +39,7 @@ func init() {
echox.MustViperFlags(v, serverCmd.Flags(), apiDefaultListen)
otelx.MustViperFlags(v, serverCmd.Flags())
echojwtx.MustViperFlags(v, serverCmd.Flags())
events.MustViperFlags(v, serverCmd.Flags(), appName)
}

func serve(ctx context.Context, cfg *config.AppConfig) {
Expand All @@ -51,6 +53,16 @@ func serve(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("unable to initialize spicedb client", "error", err)
}

eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

var policy iapl.Policy

if cfg.SpiceDB.PolicyFile != "" {
Expand All @@ -68,7 +80,10 @@ func serve(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy))
engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

srv, err := echox.NewServer(
logger.Desugar(),
Expand Down
18 changes: 13 additions & 5 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,22 @@ func worker(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("invalid spicedb policy", "error", err)
}

engine := query.NewEngine("infratographer", spiceClient, query.WithPolicy(policy), query.WithLogger(logger))

events, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
eventsConn, err := events.NewConnection(cfg.Events.Config, events.WithLogger(logger))
if err != nil {
logger.Fatalw("failed to initialize events", "error", err)
}

subscriber, err := pubsub.NewSubscriber(ctx, events, engine,
kv, err := initializeKV(cfg.Events, eventsConn)
if err != nil {
logger.Fatalw("failed to initialize KV", "error", err)
}

engine, err := query.NewEngine("infratographer", spiceClient, kv, query.WithPolicy(policy), query.WithLogger(logger))
if err != nil {
logger.Fatalw("error creating engine", "error", err)
}

subscriber, err := pubsub.NewSubscriber(ctx, eventsConn, engine,
pubsub.WithLogger(logger),
)
if err != nil {
Expand Down Expand Up @@ -140,7 +148,7 @@ func worker(ctx context.Context, cfg *config.AppConfig) {

defer cancel()

if err := events.Shutdown(ctx); err != nil {
if err := eventsConn.Shutdown(ctx); err != nil {
logger.Fatalw("failed to shutdown events gracefully", "error", "err")
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/authzed/authzed-go v0.10.1
github.com/authzed/grpcutil v0.0.0-20230908193239-4286bb1d6403
github.com/labstack/echo/v4 v4.11.3
github.com/nats-io/nats.go v1.31.0
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -58,7 +59,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nats-server/v2 v2.10.4 // indirect
github.com/nats-io/nats.go v1.31.0 // indirect
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
Expand Down
8 changes: 6 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (

// EventsConfig stores the configuration for a load-balancer-api events config
type EventsConfig struct {
events.Config `mapstructure:",squash"`
Topics []string
events.Config `mapstructure:",squash"`
Topics []string
ZedTokenBucket string
}

// AppConfig is the struct used for configuring the app
Expand All @@ -34,4 +35,7 @@ type AppConfig struct {
func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet) {
flags.StringSlice("events-topics", []string{}, "event topics to subscribe to")
viperx.MustBindFlag(v, "events.topics", flags.Lookup("events-topics"))

flags.String("events-zedtokenbucket", "", "NATS KV bucket to use for caching ZedTokens")
viperx.MustBindFlag(v, "events.zedtokenbucket", flags.Lookup("events-zedtokenbucket"))
}
23 changes: 15 additions & 8 deletions internal/query/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc

defer span.End()

consistency, consName := e.determineConsistency(ctx, resource)
span.SetAttributes(
attribute.String(
"permissions.consistency",
consName,
),
)

req := &pb.CheckPermissionRequest{
Consistency: &pb.Consistency{
Requirement: &pb.Consistency_FullyConsistent{
FullyConsistent: true,
},
},
Resource: resourceToSpiceDBRef(e.namespace, resource),
Permission: action,
Consistency: consistency,
Resource: resourceToSpiceDBRef(e.namespace, resource),
Permission: action,
Subject: &pb.SubjectReference{
Object: resourceToSpiceDBRef(e.namespace, subject),
},
Expand Down Expand Up @@ -256,13 +260,16 @@ func (e *engine) CreateRelationships(ctx context.Context, rels []types.Relations
Updates: relUpdates,
}

if _, err := e.client.WriteRelationships(ctx, request); err != nil {
resp, err := e.client.WriteRelationships(ctx, request)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

return err
}

e.updateRelationshipZedTokens(ctx, rels, resp.WrittenAt.Token)

return nil
}

Expand Down
42 changes: 39 additions & 3 deletions internal/query/relations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,20 @@ import (

pb "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/authzed/authzed-go/v1"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.infratographer.com/x/gidx"
"go.infratographer.com/x/testing/eventtools"

"go.infratographer.com/permissions-api/internal/iapl"
"go.infratographer.com/permissions-api/internal/spicedbx"
"go.infratographer.com/permissions-api/internal/testingx"
"go.infratographer.com/permissions-api/internal/types"
)

func testEngine(ctx context.Context, t *testing.T, namespace string) Engine {
func testEngine(ctx context.Context, t *testing.T, namespace string) *engine {
config := spicedbx.Config{
Endpoint: "spicedb:50051",
Key: "infradev",
Expand All @@ -36,13 +38,26 @@ func testEngine(ctx context.Context, t *testing.T, namespace string) Engine {
_, err = client.WriteSchema(ctx, request)
require.NoError(t, err)

natsSrv, err := eventtools.NewNatsServer()
require.NoError(t, err)

kvCfg := nats.KeyValueConfig{
Bucket: "zedtokens",
}

kv, err := natsSrv.JetStream.CreateKeyValue(&kvCfg)
require.NoError(t, err)

t.Cleanup(func() {
cleanDB(ctx, t, client, namespace)
})

out := NewEngine(namespace, client, WithPolicy(policy))
// We call the constructor here to ensure the engine is created appropriately, but
// then return the underlying type so we can do testing with it.
out, err := NewEngine(namespace, client, kv, WithPolicy(policy))
require.NoError(t, err)

return out
return out.(*engine)
}

func testPolicy() iapl.Policy {
Expand Down Expand Up @@ -557,6 +572,10 @@ func TestSubjectActions(t *testing.T) {
ctx := context.Background()
e := testEngine(ctx, t, namespace)

parentID, err := gidx.NewID("tnntten")
require.NoError(t, err)
parentRes, err := e.NewResourceFromID(parentID)
require.NoError(t, err)
tenID, err := gidx.NewID("tnntten")
require.NoError(t, err)
tenRes, err := e.NewResourceFromID(tenID)
Expand All @@ -580,6 +599,23 @@ func TestSubjectActions(t *testing.T) {
err = e.AssignSubjectRole(ctx, subjRes, role)
assert.NoError(t, err)

// Force a ZedToken to be created for the relevant resources. This creates a hierarchy where
// the tenRes tenant and otherRes tenant are both child resources of the parentRes tenant.
rels := []types.Relationship{
{
Resource: tenRes,
Relation: "parent",
Subject: parentRes,
},
{
Resource: otherRes,
Relation: "parent",
Subject: parentRes,
},
}
err = e.CreateRelationships(ctx, rels)
require.NoError(t, err)

type testInput struct {
resource types.Resource
action string
Expand Down
7 changes: 5 additions & 2 deletions internal/query/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/authzed/authzed-go/v1"
"github.com/nats-io/nats.go"
"go.infratographer.com/x/gidx"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -43,6 +44,7 @@ type engine struct {
logger *zap.SugaredLogger
namespace string
client *authzed.Client
kv nats.KeyValue
schema []types.ResourceType
schemaPrefixMap map[string]types.ResourceType
schemaTypeMap map[string]types.ResourceType
Expand Down Expand Up @@ -89,13 +91,14 @@ func resourceHasRoleBindings(resType types.ResourceType) bool {
}

// NewEngine returns a new client for making permissions queries.
func NewEngine(namespace string, client *authzed.Client, options ...Option) Engine {
func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, options ...Option) (Engine, error) {
tracer := otel.GetTracerProvider().Tracer("go.infratographer.com/permissions-api/internal/query")

e := &engine{
logger: zap.NewNop().Sugar(),
namespace: namespace,
client: client,
kv: kv,
tracer: tracer,
}

Expand All @@ -109,7 +112,7 @@ func NewEngine(namespace string, client *authzed.Client, options ...Option) Engi
e.cacheSchemaResources()
}

return e
return e, nil
}

// Option is a functional option for the engine
Expand Down
Loading
Loading