Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions service/entityresolution/entityresolution.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package entityresolution

import (
"time"

"github.com/go-viper/mapstructure/v2"
"github.com/opentdf/platform/protocol/go/entityresolution"
"github.com/opentdf/platform/protocol/go/entityresolution/entityresolutionconnect"
claims "github.com/opentdf/platform/service/entityresolution/claims"
keycloak "github.com/opentdf/platform/service/entityresolution/keycloak"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel/trace"
)

type ERSConfig struct {
Mode string `mapstructure:"mode" json:"mode"`
Mode string `mapstructure:"mode" json:"mode"`
CacheExpiration string `mapstructure:"cacheexpiration" json:"cacheexpiration"`
CacheCost int64 `mapstructure:"cachecost" json:"cachecost"`
}

const (
Expand All @@ -32,6 +37,7 @@ func NewRegistration() *serviceregistry.Service[entityresolutionconnect.EntityRe
ConnectRPCFunc: entityresolutionconnect.NewEntityResolutionServiceHandler,
GRPCGatewayFunc: entityresolution.RegisterEntityResolutionServiceHandler,
RegisterFunc: func(srp serviceregistry.RegistrationParams) (entityresolutionconnect.EntityResolutionServiceHandler, serviceregistry.HandlerServer) {
srp.Logger.Info("Registering Entity Resolution Service")
var inputConfig ERSConfig

if err := mapstructure.Decode(srp.Config, &inputConfig); err != nil {
Expand All @@ -42,9 +48,28 @@ func NewRegistration() *serviceregistry.Service[entityresolutionconnect.EntityRe
claimsSVC.Tracer = srp.Tracer
return EntityResolution{EntityResolutionServiceHandler: claimsSVC}, claimsHandler
}
if inputConfig.CacheExpiration == "" {
inputConfig.CacheExpiration = "5m" // Default cache expiration
}
if inputConfig.CacheCost == 0 {
inputConfig.CacheCost = 100 // Default cache cost
}
exp, err := time.ParseDuration(inputConfig.CacheExpiration)
if err != nil {
srp.Logger.Error("Failed to parse cache expiration duration", "error", err)
panic(err)
}
ersCache, err := srp.NewCacheFunc(cache.Options{
Expiration: exp,
Cost: inputConfig.CacheCost,
})
if err != nil {
srp.Logger.Error("Failed to create cache for Entity Resolution Service", "error", err)
panic(err)
}

// Default to keycloak ERS
kcSVC, kcHandler := keycloak.RegisterKeycloakERS(srp.Config, srp.Logger)
kcSVC, kcHandler := keycloak.RegisterKeycloakERS(srp.Config, srp.Logger, ersCache)
kcSVC.Tracer = srp.Tracer

return EntityResolution{EntityResolutionServiceHandler: kcSVC, Tracer: srp.Tracer}, kcHandler
Expand Down
6 changes: 4 additions & 2 deletions service/entityresolution/keycloak/entity_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/opentdf/platform/protocol/go/entityresolution"
"github.com/opentdf/platform/service/entity"
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/config"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -49,6 +50,7 @@ type KeycloakEntityResolutionService struct { //nolint:revive // Too late! Alrea
trace.Tracer
connector *KeyCloakConnector
connectorMu sync.Mutex
svcCache *cache.Cache
}

type KeycloakConfig struct { //nolint:revive // yeah but what if we want to embed multiple configs?
Expand All @@ -62,7 +64,7 @@ type KeycloakConfig struct { //nolint:revive // yeah but what if we want to embe
TokenBuffer time.Duration `mapstructure:"token_buffer_seconds" json:"token_buffer_seconds" default:"120s"`
}

func RegisterKeycloakERS(config config.ServiceConfig, logger *logger.Logger) (*KeycloakEntityResolutionService, serviceregistry.HandlerServer) {
func RegisterKeycloakERS(config config.ServiceConfig, logger *logger.Logger, svcCache *cache.Cache) (*KeycloakEntityResolutionService, serviceregistry.HandlerServer) {
var inputIdpConfig KeycloakConfig

if err := defaults.Set(&inputIdpConfig); err != nil {
Expand All @@ -73,7 +75,7 @@ func RegisterKeycloakERS(config config.ServiceConfig, logger *logger.Logger) (*K
panic(err)
}
logger.Debug("entity_resolution configuration", "config", inputIdpConfig)
keycloakSVC := &KeycloakEntityResolutionService{idpConfig: inputIdpConfig, logger: logger}
keycloakSVC := &KeycloakEntityResolutionService{idpConfig: inputIdpConfig, logger: logger, svcCache: svcCache}
return keycloakSVC, nil
}

Expand Down
6 changes: 4 additions & 2 deletions service/entityresolution/keycloak/v2/entity_resolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
entityresolutionV2 "github.com/opentdf/platform/protocol/go/entityresolution/v2"
ent "github.com/opentdf/platform/service/entity"
"github.com/opentdf/platform/service/logger"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/config"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -48,6 +49,7 @@ type EntityResolutionServiceV2 struct {
trace.Tracer
connector *Connector
connectorMu sync.Mutex
svcCache *cache.Cache
}

type Config struct {
Expand All @@ -61,7 +63,7 @@ type Config struct {
TokenBuffer time.Duration `mapstructure:"token_buffer_seconds" json:"token_buffer_seconds" default:"120s"`
}

func RegisterKeycloakERS(config config.ServiceConfig, logger *logger.Logger) (*EntityResolutionServiceV2, serviceregistry.HandlerServer) {
func RegisterKeycloakERS(config config.ServiceConfig, logger *logger.Logger, svcCache *cache.Cache) (*EntityResolutionServiceV2, serviceregistry.HandlerServer) {
var inputIdpConfig Config

if err := defaults.Set(&inputIdpConfig); err != nil {
Expand All @@ -72,7 +74,7 @@ func RegisterKeycloakERS(config config.ServiceConfig, logger *logger.Logger) (*E
panic(err)
}
logger.Debug("entity_resolution configuration", "config", inputIdpConfig)
keycloakSVC := &EntityResolutionServiceV2{idpConfig: inputIdpConfig, logger: logger}
keycloakSVC := &EntityResolutionServiceV2{idpConfig: inputIdpConfig, logger: logger, svcCache: svcCache}
return keycloakSVC, nil
}

Expand Down
28 changes: 26 additions & 2 deletions service/entityresolution/v2/entity_resolution.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package entityresolution

import (
"time"

"github.com/go-viper/mapstructure/v2"
ersV2 "github.com/opentdf/platform/protocol/go/entityresolution/v2"
"github.com/opentdf/platform/protocol/go/entityresolution/v2/entityresolutionv2connect"
claims "github.com/opentdf/platform/service/entityresolution/claims/v2"
keycloak "github.com/opentdf/platform/service/entityresolution/keycloak/v2"
"github.com/opentdf/platform/service/pkg/cache"
"github.com/opentdf/platform/service/pkg/serviceregistry"
"go.opentelemetry.io/otel/trace"
)

type ERSConfig struct {
Mode string `mapstructure:"mode" json:"mode"`
Mode string `mapstructure:"mode" json:"mode"`
CacheExpiration string `mapstructure:"cache_expiration" json:"cache_expiration"`
CacheCost int `mapstructure:"cache_cost" json:"cache_cost"`
}

const (
Expand Down Expand Up @@ -42,9 +47,28 @@ func NewRegistration() *serviceregistry.Service[entityresolutionv2connect.Entity
claimsSVC.Tracer = srp.Tracer
return EntityResolution{EntityResolutionServiceHandler: claimsSVC}, claimsHandler
}
if inputConfig.CacheExpiration == "" {
inputConfig.CacheExpiration = "5m" // Default cache expiration
}
if inputConfig.CacheCost == 0 {
inputConfig.CacheCost = 100 // Default cache cost
}
exp, err := time.ParseDuration(inputConfig.CacheExpiration)
if err != nil {
srp.Logger.Error("Failed to parse cache expiration duration", "error", err)
panic(err)
}
ersCache, err := srp.NewCacheFunc(cache.Options{
Expiration: exp,
Cost: int64(inputConfig.CacheCost),
})
if err != nil {
srp.Logger.Error("Failed to create cache for Entity Resolution Service", "error", err)
panic(err)
}

// Default to keycloak ERS
kcSVC, kcHandler := keycloak.RegisterKeycloakERS(srp.Config, srp.Logger)
kcSVC, kcHandler := keycloak.RegisterKeycloakERS(srp.Config, srp.Logger, ersCache)
kcSVC.Tracer = srp.Tracer

return EntityResolution{EntityResolutionServiceHandler: kcSVC, Tracer: srp.Tracer}, kcHandler
Expand Down
13 changes: 7 additions & 6 deletions service/pkg/server/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,14 @@ func startServices(ctx context.Context, params startServicesParams) (func(), err
svcLogger = svcLogger.With("version", svc.GetVersion())
}

// Check if the service supports and needs a cache
var cacheClient *cache.Cache
if cacheSvc, ok := svc.(serviceregistry.CacheSupportedService); ok {
cacheClient, err = cacheManager.NewCache(ns, svcLogger, *cacheSvc.CacheOptions())
// Function to create a cache given cache options
var createCacheClient func(cache.Options) (*cache.Cache, error) = func(options cache.Options) (*cache.Cache, error) {
slog.Info("creating cache client for", slog.String("namespace", ns), slog.String("service", svc.GetServiceDesc().ServiceName))
cacheClient, err := cacheManager.NewCache(fmt.Sprintf("%s-%s", ns, svc.GetServiceDesc().ServiceName), svcLogger, options)
if err != nil {
return func() {}, fmt.Errorf("issue creating cache client for %s: %w", ns, err)
return nil, fmt.Errorf("issue creating cache client for %s: %w", fmt.Sprintf("%s-%s", ns, svc.GetServiceDesc().ServiceName), err)
}
return cacheClient, nil
}

err = svc.Start(ctx, serviceregistry.RegistrationParams{
Expand All @@ -212,7 +213,7 @@ func startServices(ctx context.Context, params startServicesParams) (func(), err
RegisterReadinessCheck: health.RegisterReadinessCheck,
OTDF: otdf, // TODO: REMOVE THIS
Tracer: tracer,
Cache: cacheClient,
NewCacheFunc: createCacheClient,
KeyManagers: keyManagers,
})
if err != nil {
Expand Down
9 changes: 2 additions & 7 deletions service/pkg/serviceregistry/serviceregistry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type RegistrationParams struct {
Logger *logger.Logger
trace.Tracer

// Cache is the cache that can be used to cache data. This cache is scoped to the service
Cache *cache.Cache
// NewCacheFunc is a function that can be used to create a new cache instance for the service
NewCacheFunc func(cache.Options) (*cache.Cache, error)

KeyManagers []trust.KeyManager

Expand Down Expand Up @@ -90,11 +90,6 @@ type IService interface {
RegisterHTTPHandlers(context.Context, *runtime.ServeMux) error
}

// CacheSupportedService is implemented by services that support caching.
type CacheSupportedService interface {
CacheOptions() *cache.Options
}

// Service is a struct that holds the registration information for a service as well as the state
// of the service within the instance of the platform.
type Service[S any] struct {
Expand Down
Loading