Skip to content

Commit

Permalink
Watch for KV updates instead of reaching out for tokens (#255)
Browse files Browse the repository at this point in the history
This commit updates the query engine to watch KV updates rather than
reach out to the bucket for every permissions check. This is primarily
done to improve performance at the expense of some consistency (i.e.,
the time it takes for a permissions-api server to receive an update).

Signed-off-by: John Schaeffer <[email protected]>
  • Loading branch information
jnschaeffer authored May 14, 2024
1 parent 17e68d5 commit 3842697
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 35 deletions.
6 changes: 6 additions & 0 deletions cmd/createrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func createRole(ctx context.Context, cfg *config.AppConfig) {
logger.Fatalw("error creating engine", "error", err)
}

defer func() {
if err := engine.Stop(); err != nil {
logger.Errorw("error stopping engine", "error", err)
}
}()

resource, err := engine.NewResourceFromID(resourceID)
if err != nil {
logger.Fatalw("error creating resource", "error", err)
Expand Down
6 changes: 6 additions & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func serve(_ context.Context, cfg *config.AppConfig) {
logger.Fatalw("error creating engine", "error", err)
}

defer func() {
if err := engine.Stop(); err != nil {
logger.Errorw("error stopping engine", "error", err)
}
}()

srv, err := echox.NewServer(
logger.Desugar(),
echox.ConfigFromViper(viper.GetViper()),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/authzed/grpcutil v0.0.0-20240123194739-2ea1e3d2d98b
github.com/cockroachdb/cockroach-go/v2 v2.3.7
github.com/go-jose/go-jose/v4 v4.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/jackc/pgx/v5 v5.5.5
github.com/labstack/echo/v4 v4.11.4
github.com/nats-io/nats.go v1.34.1
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDa
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
Expand Down
5 changes: 5 additions & 0 deletions internal/query/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type Engine struct {
schema []types.ResourceType
}

// Stop does nothing but satisfies the Engine interface.
func (e *Engine) Stop() error {
return nil
}

// AssignSubjectRole does nothing but satisfies the Engine interface.
func (e *Engine) AssignSubjectRole(context.Context, types.Resource, types.Role) error {
args := e.Called()
Expand Down
2 changes: 1 addition & 1 deletion internal/query/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (e *engine) SubjectHasPermission(ctx context.Context, subject types.Resourc

defer span.End()

consistency, consName := e.determineConsistency(ctx, resource)
consistency, consName := e.determineConsistency(resource)
span.SetAttributes(
attribute.String(
"permissions.consistency",
Expand Down
4 changes: 4 additions & 0 deletions internal/query/relations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func testEngine(ctx context.Context, t *testing.T, namespace string, policy iapl
out, err := NewEngine(namespace, client, kv, store, WithPolicy(policy))
require.NoError(t, err)

t.Cleanup(func() {
out.Stop() //nolint:errcheck
})

return out.(*engine)
}

Expand Down
13 changes: 13 additions & 0 deletions internal/query/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/authzed/authzed-go/v1"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/nats-io/nats.go"
"go.infratographer.com/x/gidx"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -76,6 +77,8 @@ type Engine interface {
GetRoleBindingResource(ctx context.Context, rb types.Resource) (types.Resource, error)

AllActions() []string

Stop() error
}

type engine struct {
Expand All @@ -84,6 +87,8 @@ type engine struct {
namespace string
client *authzed.Client
kv nats.KeyValue
keyWatcher nats.KeyWatcher
zedTokenCache *expirable.LRU[string, string]
store storage.Storage
schema []types.ResourceType
schemaPrefixMap map[string]types.ResourceType
Expand Down Expand Up @@ -185,9 +190,17 @@ func NewEngine(namespace string, client *authzed.Client, kv nats.KeyValue, store
e.cacheSchemaResources()
}

if err := e.initZedTokenCache(); err != nil {
return nil, err
}

return e, nil
}

func (e *engine) Stop() error {
return e.keyWatcher.Stop()
}

// Option is a functional option for the engine
type Option func(*engine)

Expand Down
88 changes: 57 additions & 31 deletions internal/query/zedtokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"

pb "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/nats-io/nats.go"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
Expand All @@ -18,35 +19,64 @@ const (
consistencyAtLeastAsFresh = "at_least_as_fresh"
)

// getLatestZedToken attempts to get the latest ZedToken for the given resource ID.
func (e *engine) getLatestZedToken(ctx context.Context, resourceID string) (string, error) {
_, span := e.tracer.Start(
ctx,
"getLatestZedToken",
trace.WithAttributes(
attribute.String(
"permissions.resource",
resourceID,
),
),
)
// initZedTokenCache creates a new LRU cache that watches KV for ZedToken updates.
func (e *engine) initZedTokenCache() error {
status, err := e.kv.Status()
if err != nil {
return err
}

defer span.End()
ttl := status.TTL()

resp, err := e.kv.Get(resourceID)
keyWatcher, err := e.kv.WatchAll()
if err != nil {
// Only record this as an error if it wasn't a not found error.
if !errors.Is(err, nats.ErrKeyNotFound) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}

lru := expirable.NewLRU[string, string](0, nil, ttl)

e.keyWatcher = keyWatcher
e.zedTokenCache = lru

go func() {
for entry := range e.keyWatcher.Updates() {
if entry == nil {
continue
}

key := entry.Key()
value := string(entry.Value())

_, span := e.tracer.Start(
context.Background(),
"populateZedTokenCache",
trace.WithAttributes(
attribute.String(
"permissions.resource",
key,
),
),
)

e.zedTokenCache.Add(key, value)

span.End()
}
}()

return "", err
return nil
}

// getLatestZedToken attempts to get the latest ZedToken for the given resource ID.
func (e *engine) getLatestZedToken(resourceID string) (string, bool) {
resp, ok := e.zedTokenCache.Get(resourceID)
if !ok {
return "", false
}

zedToken := string(resp.Value())
zedToken := string(resp)

return zedToken, nil
return zedToken, true
}

// upsertZedToken updates the ZedToken at the given resource ID key with the provided ZedToken.
Expand Down Expand Up @@ -112,18 +142,14 @@ func (e *engine) updateRelationshipZedTokens(ctx context.Context, rels []types.R

// determineConsistency produces a consistency strategy based on whether a ZedToken exists for a
// given resource. If a ZedToken is available for the resource, at_least_as_fresh is used with the
// retrieved ZedToken. If no such token is found, or if there is an error reaching NATS, minimize_latency
// is used. This ensures that if NATS is not working or available for some reason, we can still make
// permissions checks (albeit in a degraded state).
func (e *engine) determineConsistency(ctx context.Context, resource types.Resource) (*pb.Consistency, string) {
// retrieved ZedToken. If no such token is found, minimize_latency is used. This ensures that if
// NATS is not working or available for some reason, we can still make permissions checks (albeit
// in a degraded state).
func (e *engine) determineConsistency(resource types.Resource) (*pb.Consistency, string) {
resourceID := resource.ID.String()

zedToken, err := e.getLatestZedToken(ctx, resourceID)
if err != nil {
if !errors.Is(err, nats.ErrKeyNotFound) {
e.logger.Warnw("error getting latest ZedToken - falling back to minimize_latency", "error", err.Error(), "resource_id", resourceID)
}

zedToken, ok := e.getLatestZedToken(resourceID)
if !ok {
consistency := &pb.Consistency{
Requirement: &pb.Consistency_MinimizeLatency{
MinimizeLatency: true,
Expand Down
14 changes: 12 additions & 2 deletions internal/query/zedtokens_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.infratographer.com/permissions-api/internal/testingx"
"go.infratographer.com/permissions-api/internal/types"

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.infratographer.com/x/gidx"
Expand Down Expand Up @@ -45,10 +46,19 @@ func TestConsistency(t *testing.T) {
},
}

err := e.CreateRelationships(ctx, rels)
// Watch for updates to the key to avoid racing
kw, err := e.kv.Watch(tenantID.String(), nats.UpdatesOnly())
require.NoError(t, err)

defer kw.Stop() //nolint:errcheck

err = e.CreateRelationships(ctx, rels)

require.NoError(t, err)

// Wait until we know an update occurred
<-kw.Updates()

return ctx
},
CheckFn: func(ctx context.Context, t *testing.T, res testingx.TestResult[string]) {
Expand All @@ -67,7 +77,7 @@ func TestConsistency(t *testing.T) {
}

testFn := func(ctx context.Context, res types.Resource) testingx.TestResult[string] {
_, consistencyName := e.determineConsistency(ctx, res)
_, consistencyName := e.determineConsistency(res)

out := testingx.TestResult[string]{
Success: consistencyName,
Expand Down

0 comments on commit 3842697

Please sign in to comment.