Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/Configuring.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The platform leverages [viper](https://github.com/spf13/viper) to help load conf
- [Configuration in opentdf-example.yaml](#configuration-in-opentdf-exampleyaml)
- [Role Permissions](#role-permissions)
- [Managing Authorization Policy](#managing-authorization-policy)
- [Cache Configuration](#cache-configuration)

## Deployment Mode

Expand Down Expand Up @@ -387,3 +388,23 @@ server:
#### Managing Authorization Policy

Admins can manage the authorization policy directly in the YAML configuration file. For detailed configuration options, refer to the [Casbin documentation](https://casbin.org/docs/en/syntax-for-models).

## Cache Configuration

The platform supports a cache manager to improve performance for frequently accessed data. You can configure the cache backend and its resource usage.

Root level key `cache`

| Field | Description | Default |
|--------------------------|------------------------------------------------------------------|--------------|
| `driver` | Type of cache backend (`ristretto`) | `ristretto` |
| `ristretto.maxCost` | Maximum cost for the cache (e.g. 100mb, 1gb) | `8gb` |

Example:

```yaml
cache:
driver: ristretto # Type of cache backend (currently only 'ristretto' is supported)
ristretto:
maxCost: 1gb # Maximum cost (in bytes) for the cache (default: 8GB)
```
29 changes: 27 additions & 2 deletions service/internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/logger/audit"
ctxAuth "github.com/opentdf/platform/service/pkg/auth"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/util"
"github.com/opentdf/platform/service/tracing"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
Expand All @@ -50,7 +52,10 @@ func (e Error) Error() string {
// Configurations for the server
type Config struct {
Auth auth.Config `mapstructure:"auth" json:"auth"`
GRPC GRPCConfig `mapstructure:"grpc" json:"grpc"`

Cache CacheConfig `mapstructure:"cache" json:"cache"`

GRPC GRPCConfig `mapstructure:"grpc" json:"grpc"`
// To Deprecate: Use the WithKey[X]Provider StartOptions to register trust providers.
CryptoProvider security.Config `mapstructure:"cryptoProvider" json:"cryptoProvider"`
TLS TLSConfig `mapstructure:"tls" json:"tls"`
Expand Down Expand Up @@ -86,6 +91,24 @@ func (c Config) LogValue() slog.Value {
return slog.GroupValue(group...)
}

// CacheRistrettoConfig supports human-friendly size strings like "1gb", "512mb", etc.
type CacheRistrettoConfig struct {
// MaxCost is the maximum cost of the cache, can be a number (bytes) or a string like "1gb"
MaxCost string `mapstructure:"maxCost" json:"maxCost" default:"1gb"`
}

// MaxCostBytes parses MaxCost and returns the value in bytes.
// Supports suffixes: b, kb, mb, gb, tb (case-insensitive).
func (c CacheRistrettoConfig) MaxCostBytes() int64 {
const defaultCacheMaxCostBytes int64 = 1 * 1024 * 1024 * 1024 // 1GB
return util.RelativeFileSizeToBytes(c.MaxCost, defaultCacheMaxCostBytes) // Default to 1GB if parsing fails
}

type CacheConfig struct {
Driver string `mapstructure:"driver" json:"driver" default:"ristretto"`
RistrettoCache CacheRistrettoConfig `mapstructure:"ristretto" json:"ristretto"`
}

// GRPC Server specific configurations
type GRPCConfig struct {
// Enable reflection for grpc server (default: true)
Expand Down Expand Up @@ -130,6 +153,7 @@ type OpenTDFServer struct {
HTTPServer *http.Server
ConnectRPCInProcess *inProcessServer
ConnectRPC *ConnectRPC
CacheManager *cache.Manager

// To Deprecate: Use the TrustKeyIndex and TrustKeyManager instead
CryptoProvider *security.StandardCrypto
Expand All @@ -153,7 +177,7 @@ type inProcessServer struct {
*ConnectRPC
}

func NewOpenTDFServer(config Config, logger *logger.Logger) (*OpenTDFServer, error) {
func NewOpenTDFServer(config Config, logger *logger.Logger, cacheManager *cache.Manager) (*OpenTDFServer, error) {
var (
authN *auth.Authentication
err error
Expand Down Expand Up @@ -222,6 +246,7 @@ func NewOpenTDFServer(config Config, logger *logger.Logger) (*OpenTDFServer, err
AuthN: authN,
GRPCGatewayMux: grpcGatewayMux,
HTTPServer: httpServer,
CacheManager: cacheManager,
ConnectRPC: connectRPC,
ConnectRPCInProcess: &inProcessServer{
srv: memhttp.New(connectRPCIpc.Mux),
Expand Down
130 changes: 130 additions & 0 deletions service/pkg/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package cache

import (
"context"
"errors"
"strconv"
"time"

"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/store"
ristretto_store "github.com/eko/gocache/store/ristretto/v4"
"github.com/opentdf/platform/service/logger"
)

type Manager struct {
cache *cache.Cache[interface{}]
}

// Cache is a cache implementation using gocache
type Cache struct {
manager *Manager
serviceName string
cacheOptions Options
logger *logger.Logger
}

type Options struct {
Expiration time.Duration
Cost int64
}

// NewCache creates a new Cache instance using Ristretto as the backend.
func NewCacheManager(maxCost int64) (*Manager, error) {
numCounters, bufferItems, err := EstimateRistrettoConfigParams(maxCost)
if err != nil {
return nil, err
}
config := &ristretto.Config{
NumCounters: numCounters, // number of keys to track frequency of (10x max items)
MaxCost: maxCost, // maximum cost of cache (e.g., 1<<20 for 1MB)
BufferItems: bufferItems, // number of keys per Get buffer.
}
store, err := ristretto.NewCache(config)
if err != nil {
return nil, err
}
ristrettoStore := ristretto_store.NewRistretto(store)
return &Manager{
cache: cache.New[interface{}](ristrettoStore),
}, nil
}

// NewCache creates a new Cache instance with the given service name and options.
// The purpose of this function is to create a new cache for a specific service.
// Because caching can be expensive we want to make sure there are some strict controls with
// how it is used.
func (c *Manager) NewCache(serviceName string, log *logger.Logger, options Options) (*Cache, error) {
if log == nil {
return nil, errors.New("logger cannot be nil")
}
cache := &Cache{
manager: c,
serviceName: serviceName,
cacheOptions: options,
}
cache.logger = log.
With("subsystem", "cache").
With("serviceTag", cache.getServiceTag()).
With("expiration", options.Expiration.String()).
With("cost", strconv.FormatInt(options.Cost, 10))
cache.logger.Info("created cache")
return cache, nil
}

func (c *Cache) Get(ctx context.Context, key string) (interface{}, error) {
val, err := c.manager.cache.Get(ctx, c.getKey(key))
if err != nil {
// All errors are a cache miss in the gocache library.
c.logger.Debug("cache miss", "key", key, "error", err)
return nil, err
}
c.logger.Debug("cache hit", "key", key)
return val, nil
}

func (c *Cache) Set(ctx context.Context, key string, object interface{}, tags []string) error {
tags = append(tags, c.getServiceTag())
opts := []store.Option{
store.WithTags(tags),
store.WithExpiration(c.cacheOptions.Expiration),
store.WithCost(c.cacheOptions.Cost),
}

err := c.manager.cache.Set(ctx, c.getKey(key), object, opts...)
if err != nil {
c.logger.Error("set error", "key", key, "error", err)
return err
}
c.logger.Debug("set cache", "key", key)
return nil
}

func (c *Cache) Invalidate(ctx context.Context) error {
err := c.manager.cache.Invalidate(ctx, store.WithInvalidateTags([]string{c.getServiceTag()}))
if err != nil {
c.logger.Error("invalidate error", "error", err)
return err
}
c.logger.Info("invalidate cache")
return nil
}

func (c *Cache) Delete(ctx context.Context, key string) error {
err := c.manager.cache.Delete(ctx, c.getKey(key))
if err != nil {
c.logger.Error("delete error", "key", key, "error", err)
return err
}
c.logger.Info("delete cache", "key", key)
return nil
}

func (c *Cache) getKey(key string) string {
return c.serviceName + ":" + key
}

func (c *Cache) getServiceTag() string {
return "svc:" + c.serviceName
}
54 changes: 54 additions & 0 deletions service/pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cache

import (
"testing"
"time"

"github.com/opentdf/platform/service/logger"
"github.com/stretchr/testify/require"
)

func TestNewCacheManager_ValidMaxCost(t *testing.T) {
maxCost := int64(1024 * 1024) // 1MB
manager, err := NewCacheManager(maxCost)
require.NoError(t, err)
require.NotNil(t, manager)
require.NotNil(t, manager.cache)
}

func TestNewCacheManager_InvalidMaxCost(t *testing.T) {
// Ristretto requires MaxCost > 0, so use 0 or negative
_, err := NewCacheManager(0)
require.Error(t, err)

_, err = NewCacheManager(-100)
require.Error(t, err)
}

func TestNewCacheManager_NewCacheIntegration(t *testing.T) {
maxCost := int64(1024 * 1024)
manager, err := NewCacheManager(maxCost)
require.NoError(t, err)
require.NotNil(t, manager)

// Use a simple logger stub
log, _ := newTestLogger()

options := Options{
Expiration: 1 * time.Minute,
Cost: 1,
}
cache, err := manager.NewCache("testService", log, options)
require.NoError(t, err)
require.NotNil(t, cache)
require.Equal(t, "testService", cache.serviceName)
require.Equal(t, options, cache.cacheOptions)
}

// newTestLogger returns a logger.Logger stub for testing.
func newTestLogger() (*logger.Logger, func()) {
// If logger.Logger has a constructor that doesn't require external setup, use it.
// Otherwise, return a dummy or nil logger if allowed.
l, _ := logger.NewLogger(logger.Config{Output: "stdout", Level: "error", Type: "json"})
return l, func() {}
}
55 changes: 55 additions & 0 deletions service/pkg/cache/ristretto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package cache

import (
"fmt"
"runtime"
)

const (
// minimumNumCounters is the minimum number of counters for Ristretto cache
minimumNumCounters = 1000
// maxCostFactor is the maximum cost factor for Ristretto cache
maxCostFactor = 10 // 10x max items
// maxAllowedCost is the maximum allowed cost for Ristretto cache (8GB)
maxAllowedCost = 8 * 1024 * 1024 * 1024 // 8GB
)

// EstimateRistrettoConfigParams estimates Ristretto cache config parameters
// Uses a conservative default average item cost (1KB) if the true average is unknown.
func EstimateRistrettoConfigParams(maxCost int64) (int64, int64, error) {
if maxCost < 1 {
return 0, 0, fmt.Errorf("maxCost must be greater than 0, got %d", maxCost)
}
if maxCost > maxAllowedCost {
return 0, 0, fmt.Errorf("maxCost is unreasonably high (>%d): %d", maxAllowedCost, maxCost)
}
numCounters := ristrettoComputeNumCounters(maxCost)
bufferItems := ristrettoComputeBufferItems()
return numCounters, bufferItems, nil
}

// ristrettoComputeNumCounters calculates the recommended number of counters for the Ristretto cache
// based on the provided maximum cache cost (maxCost). It estimates the number of items by dividing
// maxCost by a default average item cost (1KB), then multiplies by a factor to determine the number
// of counters. The function ensures that the returned value is not less than a predefined minimum.
// This helps optimize cache performance and accuracy in eviction policies.
func ristrettoComputeNumCounters(maxCost int64) int64 {
const defaultAvgItemCost = 1024 // 1KB
numItems := maxCost / defaultAvgItemCost
if numItems < 1 {
numItems = 1
}
numCounters := numItems * maxCostFactor
if numCounters < minimumNumCounters {
return minimumNumCounters
}
return numCounters
}

// ristrettoComputeBufferItems calculates the number of buffer items for the Ristretto cache.
// It multiplies a constant number of buffer items per writer by the number of CPUs available.
// This helps optimize throughput for concurrent cache writes.
func ristrettoComputeBufferItems() int64 {
const bufferItemsPerWriter = 64
return bufferItemsPerWriter * int64(runtime.NumCPU())
}
Loading
Loading