diff --git a/go/apps/api/integration/multi_node_ratelimiting/accuracy_test.go b/go/apps/api/integration/multi_node_ratelimiting/accuracy_test.go index 67e8d987d2..763295ce29 100644 --- a/go/apps/api/integration/multi_node_ratelimiting/accuracy_test.go +++ b/go/apps/api/integration/multi_node_ratelimiting/accuracy_test.go @@ -122,12 +122,12 @@ func TestAccuracy(t *testing.T) { upperLimit := int(maxAllowed * 1.2) lowerLimit := int(math.Min(maxAllowed*0.95, float64(total))) - t.Logf("total: %d, passed: %d, acceptable: [%d - %d]", total, passed, lowerLimit, upperLimit) + t.Logf("windows: %d, total: %d, passed: %d, acceptable: [%d - %d]", int(windows), total, passed, lowerLimit, upperLimit) // Verify results require.GreaterOrEqual(t, passed, lowerLimit, - "Success count should be >= lower limit") + "Passed count should be >= lower limit") require.LessOrEqual(t, passed, upperLimit, - "Success count should be <= upper limit") + "Passed count should be <= upper limit") t.Logf("balance: %+v", lb.GetMetrics()) diff --git a/go/apps/api/routes/register.go b/go/apps/api/routes/register.go index af507b62b3..0c2969dea3 100644 --- a/go/apps/api/routes/register.go +++ b/go/apps/api/routes/register.go @@ -44,11 +44,13 @@ func Register(srv *zen.Server, svc *Services) { srv.RegisterRoute( defaultMiddlewares, v2RatelimitLimit.New(v2RatelimitLimit.Services{ - Logger: svc.Logger, - DB: svc.Database, - Keys: svc.Keys, - Ratelimit: svc.Ratelimit, - Permissions: svc.Permissions, + Logger: svc.Logger, + DB: svc.Database, + Keys: svc.Keys, + Ratelimit: svc.Ratelimit, + Permissions: svc.Permissions, + RatelimitNamespaceByNameCache: svc.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: svc.Caches.RatelimitOverridesMatch, }), ) // v2/ratelimit.setOverride diff --git a/go/apps/api/routes/services.go b/go/apps/api/routes/services.go index 069f392e8b..b35039b8b4 100644 --- a/go/apps/api/routes/services.go +++ b/go/apps/api/routes/services.go @@ -1,6 +1,7 @@ package routes import ( + "github.com/unkeyed/unkey/go/internal/services/caches" "github.com/unkeyed/unkey/go/internal/services/keys" "github.com/unkeyed/unkey/go/internal/services/permissions" "github.com/unkeyed/unkey/go/internal/services/ratelimit" @@ -22,4 +23,5 @@ type Services struct { Permissions permissions.PermissionService Validator *validation.Validator Ratelimit ratelimit.Service + Caches caches.Caches } diff --git a/go/apps/api/routes/v2_ratelimit_limit/200_test.go b/go/apps/api/routes/v2_ratelimit_limit/200_test.go index ed841819df..e3ca2db720 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/200_test.go +++ b/go/apps/api/routes/v2_ratelimit_limit/200_test.go @@ -30,11 +30,13 @@ func TestLimitSuccessfully(t *testing.T) { require.NoError(t, err) route := handler.New(handler.Services{ - DB: h.DB, - Keys: h.Keys, - Logger: h.Logger, - Permissions: h.Permissions, - Ratelimit: h.Ratelimit, + DB: h.DB, + Keys: h.Keys, + Logger: h.Logger, + Permissions: h.Permissions, + Ratelimit: h.Ratelimit, + RatelimitNamespaceByNameCache: h.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: h.Caches.RatelimitOverridesMatch, }) h.Register(route) diff --git a/go/apps/api/routes/v2_ratelimit_limit/400_test.go b/go/apps/api/routes/v2_ratelimit_limit/400_test.go index c6b02a3272..c3f4db162a 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/400_test.go +++ b/go/apps/api/routes/v2_ratelimit_limit/400_test.go @@ -20,11 +20,13 @@ func TestBadRequests(t *testing.T) { h := testutil.NewHarness(t) route := handler.New(handler.Services{ - DB: h.DB, - Keys: h.Keys, - Logger: h.Logger, - Permissions: h.Permissions, - Ratelimit: h.Ratelimit, + DB: h.DB, + Keys: h.Keys, + Logger: h.Logger, + Permissions: h.Permissions, + Ratelimit: h.Ratelimit, + RatelimitNamespaceByNameCache: h.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: h.Caches.RatelimitOverridesMatch, }) h.Register(route) diff --git a/go/apps/api/routes/v2_ratelimit_limit/401_test.go b/go/apps/api/routes/v2_ratelimit_limit/401_test.go index 181b225457..089ff8784b 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/401_test.go +++ b/go/apps/api/routes/v2_ratelimit_limit/401_test.go @@ -13,11 +13,13 @@ func TestUnauthorizedAccess(t *testing.T) { h := testutil.NewHarness(t) route := handler.New(handler.Services{ - DB: h.DB, - Keys: h.Keys, - Logger: h.Logger, - Permissions: h.Permissions, - Ratelimit: h.Ratelimit, + DB: h.DB, + Keys: h.Keys, + Logger: h.Logger, + Permissions: h.Permissions, + Ratelimit: h.Ratelimit, + RatelimitNamespaceByNameCache: h.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: h.Caches.RatelimitOverridesMatch, }) h.Register(route) diff --git a/go/apps/api/routes/v2_ratelimit_limit/403_test.go b/go/apps/api/routes/v2_ratelimit_limit/403_test.go index 6f20a5fc77..2dca19876c 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/403_test.go +++ b/go/apps/api/routes/v2_ratelimit_limit/403_test.go @@ -31,11 +31,13 @@ func TestWorkspacePermissions(t *testing.T) { require.NoError(t, err) route := handler.New(handler.Services{ - DB: h.DB, - Keys: h.Keys, - Logger: h.Logger, - Permissions: h.Permissions, - Ratelimit: h.Ratelimit, + DB: h.DB, + Keys: h.Keys, + Logger: h.Logger, + Permissions: h.Permissions, + Ratelimit: h.Ratelimit, + RatelimitNamespaceByNameCache: h.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: h.Caches.RatelimitOverridesMatch, }) h.Register(route) diff --git a/go/apps/api/routes/v2_ratelimit_limit/404_test.go b/go/apps/api/routes/v2_ratelimit_limit/404_test.go index fe666385bb..1249d11b1b 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/404_test.go +++ b/go/apps/api/routes/v2_ratelimit_limit/404_test.go @@ -20,11 +20,13 @@ func TestNamespaceNotFound(t *testing.T) { h := testutil.NewHarness(t) route := handler.New(handler.Services{ - DB: h.DB, - Keys: h.Keys, - Logger: h.Logger, - Permissions: h.Permissions, - Ratelimit: h.Ratelimit, + DB: h.DB, + Keys: h.Keys, + Logger: h.Logger, + Permissions: h.Permissions, + Ratelimit: h.Ratelimit, + RatelimitNamespaceByNameCache: h.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: h.Caches.RatelimitOverridesMatch, }) h.Register(route) diff --git a/go/apps/api/routes/v2_ratelimit_limit/accuracy_test.go b/go/apps/api/routes/v2_ratelimit_limit/accuracy_test.go index f478031d8e..cb57d91c7f 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/accuracy_test.go +++ b/go/apps/api/routes/v2_ratelimit_limit/accuracy_test.go @@ -58,11 +58,13 @@ func TestRateLimitAccuracy(t *testing.T) { h := testutil.NewHarness(t) route := handler.New(handler.Services{ - DB: h.DB, - Keys: h.Keys, - Logger: h.Logger, - Permissions: h.Permissions, - Ratelimit: h.Ratelimit, + DB: h.DB, + Keys: h.Keys, + Logger: h.Logger, + Permissions: h.Permissions, + Ratelimit: h.Ratelimit, + RatelimitNamespaceByNameCache: h.Caches.RatelimitNamespaceByName, + RatelimitOverrideMatchesCache: h.Caches.RatelimitOverridesMatch, }) h.Register(route) ctx := context.Background() diff --git a/go/apps/api/routes/v2_ratelimit_limit/handler.go b/go/apps/api/routes/v2_ratelimit_limit/handler.go index 0888ef9a7e..52380b2a3b 100644 --- a/go/apps/api/routes/v2_ratelimit_limit/handler.go +++ b/go/apps/api/routes/v2_ratelimit_limit/handler.go @@ -2,6 +2,8 @@ package v2RatelimitLimit import ( "context" + "database/sql" + "errors" "net/http" "time" @@ -9,6 +11,7 @@ import ( "github.com/unkeyed/unkey/go/internal/services/keys" "github.com/unkeyed/unkey/go/internal/services/permissions" "github.com/unkeyed/unkey/go/internal/services/ratelimit" + "github.com/unkeyed/unkey/go/pkg/cache" "github.com/unkeyed/unkey/go/pkg/db" "github.com/unkeyed/unkey/go/pkg/fault" "github.com/unkeyed/unkey/go/pkg/otel/logging" @@ -21,11 +24,13 @@ type Request = openapi.V2RatelimitLimitRequestBody type Response = openapi.V2RatelimitLimitResponseBody type Services struct { - Logger logging.Logger - Keys keys.KeyService - DB db.Database - Permissions permissions.PermissionService - Ratelimit ratelimit.Service + Logger logging.Logger + Keys keys.KeyService + DB db.Database + Permissions permissions.PermissionService + Ratelimit ratelimit.Service + RatelimitNamespaceByNameCache cache.Cache[db.FindRatelimitNamespaceByNameParams, db.RatelimitNamespace] + RatelimitOverrideMatchesCache cache.Cache[db.FindRatelimitOverrideMatchesParams, []db.RatelimitOverride] } // New creates a new route handler for ratelimits.limit @@ -53,10 +58,26 @@ func New(svc Services) zen.Route { ctx, span := tracing.Start(ctx, "FindRatelimitNamespaceByName") - namespace, err := db.Query.FindRatelimitNamespaceByName(ctx, svc.DB.RO(), db.FindRatelimitNamespaceByNameParams{ + findNamespaceArgs := db.FindRatelimitNamespaceByNameParams{ WorkspaceID: auth.AuthorizedWorkspaceID, Name: req.Namespace, + } + namespace, err := svc.RatelimitNamespaceByNameCache.SWR(ctx, findNamespaceArgs, func(ctx context.Context) (db.RatelimitNamespace, error) { + return db.Query.FindRatelimitNamespaceByName(ctx, svc.DB.RO(), findNamespaceArgs) + }, func(err error) cache.Op { + if err == nil { + // everything went well and we have a namespace response + return cache.WriteValue + } + if errors.Is(err, sql.ErrNoRows) { + // the response is empty, we need to store that the namespace does not exist + return cache.WriteNull + } + // this is a noop in the cache + return cache.Noop + }) + span.End() if err != nil { return db.HandleErr(err, "namespace") @@ -99,12 +120,25 @@ func New(svc Services) zen.Route { ) } - ctx, overridesSpan := tracing.Start(ctx, "FindRatelimitOverrideMatches") - overrides, err := db.Query.FindRatelimitOverrideMatches(ctx, svc.DB.RO(), db.FindRatelimitOverrideMatchesParams{ + findOverrideMatchesArgs := db.FindRatelimitOverrideMatchesParams{ WorkspaceID: auth.AuthorizedWorkspaceID, NamespaceID: namespace.ID, Identifier: req.Identifier, + } + ctx, overridesSpan := tracing.Start(ctx, "FindRatelimitOverrideMatches") + overrides, err := svc.RatelimitOverrideMatchesCache.SWR(ctx, findOverrideMatchesArgs, func(ctx context.Context) ([]db.RatelimitOverride, error) { + return db.Query.FindRatelimitOverrideMatches(ctx, svc.DB.RO(), findOverrideMatchesArgs) + }, func(err error) cache.Op { + if err == nil { + // everything went well and we have a namespace response + return cache.WriteValue + } + + // this is a noop in the cache + return cache.Noop + }) + overridesSpan.End() if err != nil { return db.HandleErr(err, "override") diff --git a/go/apps/api/run.go b/go/apps/api/run.go index 9aabf8721d..ed55353340 100644 --- a/go/apps/api/run.go +++ b/go/apps/api/run.go @@ -11,6 +11,7 @@ import ( "time" "github.com/unkeyed/unkey/go/apps/api/routes" + "github.com/unkeyed/unkey/go/internal/services/caches" "github.com/unkeyed/unkey/go/internal/services/keys" "github.com/unkeyed/unkey/go/internal/services/permissions" "github.com/unkeyed/unkey/go/internal/services/ratelimit" @@ -101,6 +102,14 @@ func Run(ctx context.Context, cfg Config) error { } } + caches, err := caches.New(caches.Config{ + Logger: logger, + Clock: clk, + }) + if err != nil { + return fmt.Errorf("unable to create caches: %w", err) + } + srv, err := zen.New(zen.Config{ InstanceID: cfg.ClusterInstanceID, Logger: logger, @@ -117,9 +126,10 @@ func Run(ctx context.Context, cfg Config) error { } keySvc, err := keys.New(keys.Config{ - Logger: logger, - DB: db, - Clock: clk, + Logger: logger, + DB: db, + Clock: clk, + KeyCache: caches.KeyByHash, }) if err != nil { return fmt.Errorf("unable to create key service: %w", err) @@ -156,6 +166,7 @@ func Run(ctx context.Context, cfg Config) error { DB: db, Logger: logger, Clock: clk, + Cache: caches.PermissionsByKeyId, }) if err != nil { return fmt.Errorf("unable to create permissions service: %w", err) @@ -169,6 +180,7 @@ func Run(ctx context.Context, cfg Config) error { Validator: validator, Ratelimit: rlSvc, Permissions: p, + Caches: caches, }) go func() { diff --git a/go/internal/services/caches/caches.go b/go/internal/services/caches/caches.go new file mode 100644 index 0000000000..718ce2ae17 --- /dev/null +++ b/go/internal/services/caches/caches.go @@ -0,0 +1,130 @@ +package caches + +import ( + "time" + + "github.com/unkeyed/unkey/go/pkg/cache" + "github.com/unkeyed/unkey/go/pkg/cache/middleware" + "github.com/unkeyed/unkey/go/pkg/clock" + "github.com/unkeyed/unkey/go/pkg/db" + "github.com/unkeyed/unkey/go/pkg/otel/logging" +) + +// Caches holds all cache instances used throughout the application. +// Each field represents a specialized cache for a specific data entity. +type Caches struct { + // RatelimitNamespaceByName caches ratelimit namespace lookups by name. + // Keys are db.FindRatelimitNamespaceByNameParams and values are db.RatelimitNamespace. + RatelimitNamespaceByName cache.Cache[db.FindRatelimitNamespaceByNameParams, db.RatelimitNamespace] + + // RatelimitOverridesMatch caches ratelimit override matches for specific criteria. + // Keys are db.FindRatelimitOverrideMatchesParams and values are slices of db.RatelimitOverride. + RatelimitOverridesMatch cache.Cache[db.FindRatelimitOverrideMatchesParams, []db.RatelimitOverride] + + // KeyByHash caches API key lookups by their hash. + // Keys are string (hash) and values are db.Key. + KeyByHash cache.Cache[string, db.Key] + + // PermissionsByKeyId caches permission strings for a given key ID. + // Keys are string (key ID) and values are slices of string representing permissions. + PermissionsByKeyId cache.Cache[string, []string] +} + +// Config defines the configuration options for initializing caches. +type Config struct { + // Logger is used for logging cache operations and errors. + Logger logging.Logger + + // Clock provides time functionality, allowing easier testing. + Clock clock.Clock +} + +// New creates and initializes all cache instances with appropriate settings. +// +// It configures each cache with specific freshness/staleness windows, size limits, +// resource names for tracing, and wraps them with tracing middleware. +// +// Parameters: +// - config: Configuration options including logger and clock implementations. +// +// Returns: +// - Caches: A struct containing all initialized cache instances. +// - error: An error if any cache failed to initialize. +// +// All caches are thread-safe and can be accessed concurrently. +// +// Example: +// +// logger := logging.NewLogger() +// clock := clock.RealClock{} +// +// caches, err := caches.New(caches.Config{ +// Logger: logger, +// Clock: clock, +// }) +// if err != nil { +// log.Fatalf("Failed to initialize caches: %v", err) +// } +// +// // Use the caches +// key, err := caches.KeyByHash.Get(ctx, "some-hash") +func New(config Config) (Caches, error) { + + ratelimitNamespace, err := cache.New(cache.Config[db.FindRatelimitNamespaceByNameParams, db.RatelimitNamespace]{ + Fresh: time.Minute, + Stale: time.Hour, + Logger: config.Logger, + MaxSize: 1_000, + Resource: "ratelimit_namespace_by_name", + Clock: config.Clock, + }) + if err != nil { + return Caches{}, err + } + + ratelimitOverridesMatch, err := cache.New(cache.Config[db.FindRatelimitOverrideMatchesParams, []db.RatelimitOverride]{ + Fresh: time.Minute, + Stale: time.Hour, + Logger: config.Logger, + MaxSize: 1_000, + Resource: "ratelimit_overrides", + Clock: config.Clock, + }) + if err != nil { + return Caches{}, err + } + + keyByHash, err := cache.New(cache.Config[string, db.Key]{ + Fresh: 10 * time.Second, + Stale: 60 * time.Second, + Logger: config.Logger, + MaxSize: 1_000_000, + + Resource: "key_by_hash", + Clock: config.Clock, + }) + if err != nil { + return Caches{}, err + } + + permissionsByKeyId, err := cache.New(cache.Config[string, []string]{ + Fresh: 10 * time.Second, + Stale: 60 * time.Second, + Logger: config.Logger, + MaxSize: 1_000_000, + + Resource: "permissions_by_key_id", + Clock: config.Clock, + }) + if err != nil { + return Caches{}, err + } + + return Caches{ + RatelimitNamespaceByName: middleware.WithTracing(ratelimitNamespace), + RatelimitOverridesMatch: middleware.WithTracing(ratelimitOverridesMatch), + KeyByHash: middleware.WithTracing(keyByHash), + PermissionsByKeyId: middleware.WithTracing(permissionsByKeyId), + }, nil + +} diff --git a/go/internal/services/caches/doc.go b/go/internal/services/caches/doc.go new file mode 100644 index 0000000000..ab3a9ce514 --- /dev/null +++ b/go/internal/services/caches/doc.go @@ -0,0 +1,16 @@ +// Package caches provides a centralized caching service for commonly accessed database entities. +// +// The caches package is designed to improve performance by reducing database load +// for frequently accessed data. It maintains in-memory caches with configurable +// freshness and staleness windows for various database entities used throughout +// the Unkey system. +// +// Common use cases: +// - Looking up ratelimit namespaces by name +// - Retrieving ratelimit overrides that match specific criteria +// - Getting API keys by their hash +// - Retrieving permissions associated with a key ID +// +// All caches are initialized with appropriate TTL settings and size limits, +// and include OpenTelemetry tracing for observability. +package caches diff --git a/go/internal/services/keys/service.go b/go/internal/services/keys/service.go index a7dcf0b269..626d63f5e4 100644 --- a/go/internal/services/keys/service.go +++ b/go/internal/services/keys/service.go @@ -1,19 +1,17 @@ package keys import ( - "time" - "github.com/unkeyed/unkey/go/pkg/cache" - cacheMiddleware "github.com/unkeyed/unkey/go/pkg/cache/middleware" "github.com/unkeyed/unkey/go/pkg/clock" "github.com/unkeyed/unkey/go/pkg/db" "github.com/unkeyed/unkey/go/pkg/otel/logging" ) type Config struct { - Logger logging.Logger - DB db.Database - Clock clock.Clock + Logger logging.Logger + DB db.Database + Clock clock.Clock + KeyCache cache.Cache[string, db.Key] } type service struct { @@ -25,22 +23,9 @@ type service struct { func New(config Config) (*service, error) { - keyCache, err := cache.New[string, db.Key](cache.Config[string, db.Key]{ - Fresh: 10 * time.Second, - Stale: 60 * time.Second, - Logger: config.Logger, - MaxSize: 1_000_000, - - Resource: "permissions", - Clock: config.Clock, - }) - if err != nil { - return nil, err - } - return &service{ logger: config.Logger, db: config.DB, - keyCache: cacheMiddleware.WithTracing(cache.Cache[string, db.Key](keyCache)), + keyCache: config.KeyCache, }, nil } diff --git a/go/internal/services/permissions/service.go b/go/internal/services/permissions/service.go index 20d6ff05b6..6401132113 100644 --- a/go/internal/services/permissions/service.go +++ b/go/internal/services/permissions/service.go @@ -1,10 +1,7 @@ package permissions import ( - "time" - "github.com/unkeyed/unkey/go/pkg/cache" - cacheMiddleware "github.com/unkeyed/unkey/go/pkg/cache/middleware" "github.com/unkeyed/unkey/go/pkg/clock" "github.com/unkeyed/unkey/go/pkg/db" "github.com/unkeyed/unkey/go/pkg/otel/logging" @@ -26,29 +23,15 @@ type Config struct { DB db.Database Logger logging.Logger Clock clock.Clock + Cache cache.Cache[string, []string] } func New(config Config) (*service, error) { - c, err := cache.New[string, []string](cache.Config[string, []string]{ - // How long the data is considered fresh - // Subsequent requests in this time will try to use the cache - Fresh: 10 * time.Second, - Stale: 60 * time.Second, - Logger: config.Logger, - MaxSize: 1_000_000, - - Resource: "permissions", - Clock: config.Clock, - }) - if err != nil { - return nil, err - } - return &service{ db: config.DB, logger: config.Logger, rbac: rbac.New(), - cache: cacheMiddleware.WithTracing(c), + cache: config.Cache, }, nil } diff --git a/go/pkg/cache/cache.go b/go/pkg/cache/cache.go index 408ab43815..23447fc3f8 100644 --- a/go/pkg/cache/cache.go +++ b/go/pkg/cache/cache.go @@ -8,7 +8,6 @@ import ( "time" "github.com/maypok86/otter" - "github.com/panjf2000/ants" "github.com/unkeyed/unkey/go/pkg/clock" "github.com/unkeyed/unkey/go/pkg/fault" "github.com/unkeyed/unkey/go/pkg/otel/logging" @@ -18,19 +17,17 @@ import ( ) type cache[K comparable, V any] struct { - otter otter.Cache[K, swrEntry[V]] - fresh time.Duration - stale time.Duration - // If a key is stale, its key will be put into this channel and a goroutine refreshes it in the background - refreshC chan K + otter otter.Cache[K, swrEntry[V]] + fresh time.Duration + stale time.Duration logger logging.Logger resource string clock clock.Clock + revalidateC chan func() + inflightMu sync.Mutex inflightRefreshes map[K]bool - - pool *ants.Pool } type Config[K comparable, V any] struct { @@ -62,14 +59,13 @@ func New[K comparable, V any](config Config[K, V]) (*cache[K, V], error) { return nil, err } - otter, err := builder.CollectStats().Cost(func(key K, value swrEntry[V]) uint32 { - return 1 - }).WithTTL(config.Stale).Build() - if err != nil { - return nil, err - } - - pool, err := ants.NewPool(10) + otter, err := builder. + CollectStats(). + Cost(func(key K, value swrEntry[V]) uint32 { + return 1 + }). + WithTTL(config.Stale). + Build() if err != nil { return nil, err } @@ -78,15 +74,22 @@ func New[K comparable, V any](config Config[K, V]) (*cache[K, V], error) { otter: otter, fresh: config.Fresh, stale: config.Stale, - refreshC: make(chan K, 1000), logger: config.Logger, resource: config.Resource, clock: config.Clock, - pool: pool, + revalidateC: make(chan func(), 1000), inflightMu: sync.Mutex{}, inflightRefreshes: make(map[K]bool), } + for range 10 { + go func() { + for revalidate := range c.revalidateC { + revalidate() + } + }() + } + err = c.registerMetrics() if err != nil { return nil, err @@ -234,10 +237,8 @@ func (c *cache[K, V]) Restore(ctx context.Context, b []byte) error { } now := c.clock.Now() for key, entry := range data { - if now.Before(entry.Fresh) { + if now.Before(entry.Fresh) || now.Before(entry.Stale) { c.Set(ctx, key, entry.Value) - } else if now.Before(entry.Stale) { - c.refreshC <- key } // If the entry is older than, we don't restore it } @@ -248,11 +249,12 @@ func (c *cache[K, V]) Clear(ctx context.Context) { c.otter.Clear() } -func (c *cache[K, V]) refresh( +func (c *cache[K, V]) revalidate( ctx context.Context, key K, refreshFromOrigin func(context.Context) (V, error), op func(error) Op, ) { + c.inflightMu.Lock() _, ok := c.inflightRefreshes[key] if ok { @@ -268,8 +270,11 @@ func (c *cache[K, V]) refresh( c.inflightMu.Unlock() }() + metrics.Cache.Revalidations.Add(ctx, 1, metric.WithAttributes(attribute.String("resource", c.resource))) v, err := refreshFromOrigin(ctx) - + if err != nil { + c.logger.Warn("failed to revalidate", "error", err.Error(), "key", key) + } switch op(err) { case WriteValue: c.Set(ctx, key, v) @@ -303,13 +308,10 @@ func (c *cache[K, V]) SWR( // We have data, but it's stale, so we refresh it in the background // but return the current value - err := c.pool.Submit(func() { - c.refresh(ctx, key, refreshFromOrigin, op) - }) - if err != nil { - c.logger.Error("failed to submit refresh task", "error", err.Error()) - } + c.revalidateC <- func() { + c.revalidate(ctx, key, refreshFromOrigin, op) + } return e.Value, nil } diff --git a/go/pkg/error_code/codes.go b/go/pkg/error_code/codes.go deleted file mode 100644 index 453abe94af..0000000000 --- a/go/pkg/error_code/codes.go +++ /dev/null @@ -1,92 +0,0 @@ -// Package errorcode provides error classification -package errorcode - -import ( - "encoding/json" - "fmt" -) - -// System represents the source system of an error -type System string - -// Supported system constants -const ( - SystemUnkey System = "UNKEY" // Unkey system errors - SystemAWS System = "AWS" // AWS-related errors - SystemGitHub System = "GITHUB" // GitHub-related errors -) - -// Namespace represents the functional area where an error occurred -type Namespace string - -// Supported namespace constants -const ( - // Unkey system namespaces - NamespaceDeployment Namespace = "DEPLOYMENT" // Deployment-related errors - NamespaceDatabase Namespace = "DATABASE" // Database-related errors - NamespaceKey Namespace = "KEY" // Key-related errors - - // AWS namespaces (to be defined) - - // GitHub namespaces (to be defined) -) - -// Subsystem represents a specific component within a namespace -type Subsystem string - -// base defines the core structure of an error -type base struct { - // System identifies the source system where the error originated - System System `json:"system"` - - // Namespace identifies the functional area where the error occurred - Namespace Namespace `json:"namespace"` - - // Name is the specific identifier for this error - Name string `json:"name"` - - // Description provides a human-readable explanation of the error - Description string `json:"description"` - - // PublicMeta contains additional public information about the error - // that can be safely exposed to end users - PublicMeta map[string]any `json:"publicMeta"` - - // InternalMeta contains sensitive internal information about the error - // that should not be exposed to end users - InternalMeta map[string]any `json:"internalMeta"` - - // Code is a formatted string combining System:Namespace:Name - // for easy error identification - Code string `json:"code"` - - // Cause is the original error that caused this. - Cause error `json:"cause"` -} - -// newBase creates a new base error with the given parameters -func newBase( - err error, - system System, - namespace Namespace, - name string, - description string, -) base { - return base{ - System: system, - Namespace: namespace, - Name: name, - Description: description, - PublicMeta: map[string]any{}, - InternalMeta: map[string]any{}, - Code: fmt.Sprintf("EID:%s:%s:%s", system, namespace, name), - Cause: err, - } - -} - -// Marshall converts the base error to JSON -func (e base) Marshall() ([]byte, error) { - return json.Marshal(e) - -} diff --git a/go/pkg/error_code/unkey_database_not_unique.go b/go/pkg/error_code/unkey_database_not_unique.go deleted file mode 100644 index 2879b562e0..0000000000 --- a/go/pkg/error_code/unkey_database_not_unique.go +++ /dev/null @@ -1,18 +0,0 @@ -package errorcode - -type UnkeyDatabaseNotUniqueError struct { - base -} - -func NewUnkeyDatabaseNotUniqueError(err error) UnkeyDatabaseNotUniqueError { - return UnkeyDatabaseNotUniqueError{ - base: newBase( - err, - SystemUnkey, - NamespaceKey, - "CONFLICT", - "The resource identifier must be unique.", - ), - } - -} diff --git a/go/pkg/error_code/unkey_database_transaction_timeout.go b/go/pkg/error_code/unkey_database_transaction_timeout.go deleted file mode 100644 index ba1575853d..0000000000 --- a/go/pkg/error_code/unkey_database_transaction_timeout.go +++ /dev/null @@ -1,18 +0,0 @@ -package errorcode - -type UnkeyDatabaseTransactionTimeoutError struct { - base -} - -func NewUnkeyDatabaseTransactionTimeoutError(err error) UnkeyDatabaseTransactionTimeoutError { - return UnkeyDatabaseTransactionTimeoutError{ - base: newBase( - err, - SystemUnkey, - NamespaceDatabase, - "TRANSACTION_TIMEOUT", - "The transaction reached its max timeout and aborted.", - ), - } - -} diff --git a/go/pkg/error_code/unkey_key_not_found.go b/go/pkg/error_code/unkey_key_not_found.go deleted file mode 100644 index 2e6e8c7e3a..0000000000 --- a/go/pkg/error_code/unkey_key_not_found.go +++ /dev/null @@ -1,18 +0,0 @@ -package errorcode - -type UnkeyKeyNotFoundError struct { - base -} - -func NewUnkeyKeyNotFoundError(err error) UnkeyKeyNotFoundError { - return UnkeyKeyNotFoundError{ - base: newBase( - err, - SystemUnkey, - NamespaceKey, - "NOT_FOUND", - "The requested key does not exist.", - ), - } - -} diff --git a/go/pkg/otel/metrics/metrics.go b/go/pkg/otel/metrics/metrics.go index 25015f1923..bac56b730b 100644 --- a/go/pkg/otel/metrics/metrics.go +++ b/go/pkg/otel/metrics/metrics.go @@ -146,6 +146,18 @@ var ( // return nil // }) Capacity Int64Observable + + // Revalidations tracks the number of times the cache has been revalidated. + // Use this to monitor cache refresh frequency and performance. + // + // Attributes: + // - resource (string): The type of resource being cached (e.g., "user_profile") + // + // Example: + // metrics.Caches.Revalidations.Add(ctx, 1, metric.WithAttributes( + // attribute.String("resource", "keys"), + // )) + Revalidations Int64Counter } // Cluster contains metrics related to cluster operations and status @@ -255,6 +267,10 @@ func Init(m metric.Meter) error { metric.WithDescription("Maximum number of items the cache can hold."), }, } + Cache.Revalidations, err = m.Int64Counter("cache_revalidations", metric.WithDescription("how many times the cache does background revalidation")) + if err != nil { + return err + } Cluster.Size = &int64ObservableGauge{ m: m, diff --git a/go/pkg/testutil/http.go b/go/pkg/testutil/http.go index df9d9c6278..87842b9256 100644 --- a/go/pkg/testutil/http.go +++ b/go/pkg/testutil/http.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/unkeyed/unkey/go/internal/services/caches" "github.com/unkeyed/unkey/go/internal/services/keys" "github.com/unkeyed/unkey/go/internal/services/permissions" "github.com/unkeyed/unkey/go/internal/services/ratelimit" @@ -34,6 +35,7 @@ type Harness struct { middleware []zen.Middleware DB db.Database + Caches caches.Caches Logger logging.Logger Keys keys.KeyService Permissions permissions.PermissionService @@ -57,6 +59,12 @@ func NewHarness(t *testing.T) *Harness { }) require.NoError(t, err) + caches, err := caches.New(caches.Config{ + Logger: logger, + Clock: clk, + }) + require.NoError(t, err) + srv, err := zen.New(zen.Config{ InstanceID: "test", Logger: logger, @@ -64,9 +72,10 @@ func NewHarness(t *testing.T) *Harness { require.NoError(t, err) keyService, err := keys.New(keys.Config{ - Logger: logger, - DB: db, - Clock: clk, + Logger: logger, + DB: db, + Clock: clk, + KeyCache: caches.KeyByHash, }) require.NoError(t, err) @@ -77,6 +86,7 @@ func NewHarness(t *testing.T) *Harness { DB: db, Logger: logger, Clock: clk, + Cache: caches.PermissionsByKeyId, }) require.NoError(t, err) @@ -105,6 +115,7 @@ func NewHarness(t *testing.T) *Harness { DB: db, seeder: seeder, Clock: clk, + Caches: caches, middleware: []zen.Middleware{ zen.WithTracing(),