Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions graphqlmetrics/core/metrics_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ type MetricsService struct {
// NewMetricsService creates a new metrics service
func NewMetricsService(logger *zap.Logger, chConn clickhouse.Conn, processorConfig ProcessorConfig) *MetricsService {
cacheConfig := &ristretto.Config[string, struct{}]{
MaxCost: 50_000,
NumCounters: 50_000 * 10,
BufferItems: 64,
MaxCost: 50_000,
NumCounters: 50_000 * 10,
BufferItems: 64,
IgnoreInternalCost: true,
}
opGuardCache, err := ristretto.NewCache[string, struct{}](cacheConfig)
if err != nil {
Expand Down
26 changes: 4 additions & 22 deletions router-tests/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,18 @@ import (
"regexp"
"strings"
"testing"
"time"
"unsafe"

"github.com/wundergraph/cosmo/router/core"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/wundergraph/cosmo/router-tests/testenv"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/config"
"github.com/wundergraph/cosmo/router/pkg/trace/tracetest"
"go.opentelemetry.io/otel/sdk/metric"
)

// TODO: This is a temporary workaround to get the size of the storeItem struct as ristretto.Cache adds an internal cost to the provided cost.
const ristrettoInternalCost = int64(unsafe.Sizeof(storeItem[any]{}))

// storeItem is a type that represents the structure of an internal item in the ristretto.Cache.
type storeItem[V any] struct {
//lint:ignore U1000 reason: temporary workaround
key uint64
//lint:ignore U1000 reason: temporary workaround
conflict uint64
//lint:ignore U1000 reason: temporary workaround
value V
//lint:ignore U1000 reason: temporary workaround
expiration time.Time
}

func TestPrometheus(t *testing.T) {
t.Parallel()
const employeesIDData = `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`
Expand Down Expand Up @@ -2985,7 +2967,7 @@ func TestPrometheus(t *testing.T) {
err error
metricFamilies []*io_prometheus_client.MetricFamily
// The base cost to store any item in the cache with the current configuration
baseCost = ristrettoInternalCost + 1
baseCost int64 = 1
)

metricReaderFiltered := metric.NewManualReader()
Expand Down Expand Up @@ -3313,7 +3295,7 @@ func TestPrometheus(t *testing.T) {
err error
metricFamilies []*io_prometheus_client.MetricFamily
// The base cost to store any item in the cache with the current configuration
baseCost = ristrettoInternalCost + 1
baseCost int64 = 1
)

metricReaderFiltered := metric.NewManualReader()
Expand Down Expand Up @@ -3640,7 +3622,7 @@ func TestPrometheus(t *testing.T) {
err error
metricFamilies []*io_prometheus_client.MetricFamily
// The base cost to store any item in the cache with the current configuration
baseCost = ristrettoInternalCost + 1
baseCost int64 = 1
)

metricReaderFiltered := metric.NewManualReader()
Expand Down
2 changes: 1 addition & 1 deletion router-tests/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestOperationCacheTelemetry(t *testing.T) {

const (
// The base cost to store any item in the cache with the current configuration
baseCost = ristrettoInternalCost + 1
baseCost = 1
employeesIDData = `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":10},{"id":11},{"id":12}]}}`
employeesTagData = `{"data":{"employees":[{"tag":""},{"tag":""},{"tag":""},{"tag":""},{"tag":""},{"tag":""},{"tag":""},{"tag":""},{"tag":""},{"tag":""}]}}`
)
Expand Down
163 changes: 90 additions & 73 deletions router/core/graph_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,90 @@ type graphMux struct {
otelCacheMetrics *rmetric.CacheMetrics
}

// buildOperationCaches creates the caches for the graph mux.
// The caches are created based on the engine configuration.
func (s *graphMux) buildOperationCaches(srv *graphServer) (err error) {

// We create a new execution plan cache for each operation planner which is coupled to
// the specific engine configuration. This is necessary because otherwise we would return invalid plans.
//
// when an execution plan was generated, which can be quite expensive, we want to cache it
// this means that we can hash the input and cache the generated plan
// the next time we get the same input, we can just return the cached plan
// the engine is smart enough to first do normalization and then hash the input
// this means that we can cache the normalized input and don't have to worry about
// different inputs that would generate the same execution plan

if srv.engineExecutionConfiguration.ExecutionPlanCacheSize > 0 {
planCacheConfig := &ristretto.Config[uint64, *planWithMetaData]{
Metrics: srv.metricConfig.OpenTelemetry.GraphqlCache || srv.metricConfig.Prometheus.GraphqlCache,
MaxCost: srv.engineExecutionConfiguration.ExecutionPlanCacheSize,
NumCounters: srv.engineExecutionConfiguration.ExecutionPlanCacheSize * 10,
IgnoreInternalCost: true,
BufferItems: 64,
}
s.planCache, err = ristretto.NewCache[uint64, *planWithMetaData](planCacheConfig)
if err != nil {
return fmt.Errorf("failed to create planner cache: %w", err)
}
}

if srv.engineExecutionConfiguration.EnablePersistedOperationsCache || srv.automaticPersistedQueriesConfig.Enabled {
cacheSize := int64(1024)
persistedOperationCacheConfig := &ristretto.Config[uint64, NormalizationCacheEntry]{
MaxCost: cacheSize,
NumCounters: cacheSize * 10,
IgnoreInternalCost: true,
BufferItems: 64,
}

s.persistedOperationCache, _ = ristretto.NewCache[uint64, NormalizationCacheEntry](persistedOperationCacheConfig)
}

if srv.engineExecutionConfiguration.EnableNormalizationCache && srv.engineExecutionConfiguration.NormalizationCacheSize > 0 {
normalizationCacheConfig := &ristretto.Config[uint64, NormalizationCacheEntry]{
Metrics: srv.metricConfig.OpenTelemetry.GraphqlCache || srv.metricConfig.Prometheus.GraphqlCache,
MaxCost: srv.engineExecutionConfiguration.NormalizationCacheSize,
NumCounters: srv.engineExecutionConfiguration.NormalizationCacheSize * 10,
IgnoreInternalCost: true,
BufferItems: 64,
}
s.normalizationCache, err = ristretto.NewCache[uint64, NormalizationCacheEntry](normalizationCacheConfig)
if err != nil {
return fmt.Errorf("failed to create normalization cache: %w", err)
}
}

if srv.engineExecutionConfiguration.EnableValidationCache && srv.engineExecutionConfiguration.ValidationCacheSize > 0 {
validationCacheConfig := &ristretto.Config[uint64, bool]{
Metrics: srv.metricConfig.OpenTelemetry.GraphqlCache || srv.metricConfig.Prometheus.GraphqlCache,
MaxCost: srv.engineExecutionConfiguration.ValidationCacheSize,
NumCounters: srv.engineExecutionConfiguration.ValidationCacheSize * 10,
IgnoreInternalCost: true,
BufferItems: 64,
}
s.validationCache, err = ristretto.NewCache[uint64, bool](validationCacheConfig)
if err != nil {
return fmt.Errorf("failed to create validation cache: %w", err)
}
}

if srv.securityConfiguration.ComplexityCalculationCache != nil && srv.securityConfiguration.ComplexityCalculationCache.Enabled && srv.securityConfiguration.ComplexityCalculationCache.CacheSize > 0 {
complexityCalculationCacheConfig := &ristretto.Config[uint64, ComplexityCacheEntry]{
MaxCost: srv.securityConfiguration.ComplexityCalculationCache.CacheSize,
NumCounters: srv.securityConfiguration.ComplexityCalculationCache.CacheSize * 10,
IgnoreInternalCost: true,
BufferItems: 64,
}
s.complexityCalculationCache, err = ristretto.NewCache[uint64, ComplexityCacheEntry](complexityCalculationCacheConfig)
if err != nil {
return fmt.Errorf("failed to create query depth cache: %w", err)
}
}

return nil
}

// configureCacheMetrics sets up the cache metrics for this mux if enabled in the config.
func (s *graphMux) configureCacheMetrics(srv *graphServer, baseOtelAttributes []attribute.KeyValue) error {
if srv.metricConfig.OpenTelemetry.GraphqlCache {
Expand Down Expand Up @@ -457,76 +541,8 @@ func (s *graphServer) buildGraphMux(ctx context.Context,
return nil, err
}

// We create a new execution plan cache for each operation planner which is coupled to
// the specific engine configuration. This is necessary because otherwise we would return invalid plans.
//
// when an execution plan was generated, which can be quite expensive, we want to cache it
// this means that we can hash the input and cache the generated plan
// the next time we get the same input, we can just return the cached plan
// the engine is smart enough to first do normalization and then hash the input
// this means that we can cache the normalized input and don't have to worry about
// different inputs that would generate the same execution plan

if s.engineExecutionConfiguration.ExecutionPlanCacheSize > 0 {
planCacheConfig := &ristretto.Config[uint64, *planWithMetaData]{
Metrics: s.metricConfig.OpenTelemetry.GraphqlCache || s.metricConfig.Prometheus.GraphqlCache,
MaxCost: s.engineExecutionConfiguration.ExecutionPlanCacheSize,
NumCounters: s.engineExecutionConfiguration.ExecutionPlanCacheSize * 10,
BufferItems: 64,
}
gm.planCache, err = ristretto.NewCache[uint64, *planWithMetaData](planCacheConfig)
if err != nil {
return nil, fmt.Errorf("failed to create planner cache: %w", err)
}
}

if s.engineExecutionConfiguration.EnablePersistedOperationsCache || s.automaticPersistedQueriesConfig.Enabled {
cacheSize := int64(1024 * 10)
persistedOperationCacheConfig := &ristretto.Config[uint64, NormalizationCacheEntry]{
MaxCost: cacheSize,
NumCounters: cacheSize * 10,
BufferItems: 64,
}

gm.persistedOperationCache, _ = ristretto.NewCache[uint64, NormalizationCacheEntry](persistedOperationCacheConfig)
}

if s.engineExecutionConfiguration.EnableNormalizationCache && s.engineExecutionConfiguration.NormalizationCacheSize > 0 {
normalizationCacheConfig := &ristretto.Config[uint64, NormalizationCacheEntry]{
Metrics: s.metricConfig.OpenTelemetry.GraphqlCache || s.metricConfig.Prometheus.GraphqlCache,
MaxCost: s.engineExecutionConfiguration.NormalizationCacheSize,
NumCounters: s.engineExecutionConfiguration.NormalizationCacheSize * 10,
BufferItems: 64,
}
gm.normalizationCache, err = ristretto.NewCache[uint64, NormalizationCacheEntry](normalizationCacheConfig)
if err != nil {
return nil, fmt.Errorf("failed to create normalization cache: %w", err)
}
}

if s.engineExecutionConfiguration.EnableValidationCache && s.engineExecutionConfiguration.ValidationCacheSize > 0 {
validationCacheConfig := &ristretto.Config[uint64, bool]{
Metrics: s.metricConfig.OpenTelemetry.GraphqlCache || s.metricConfig.Prometheus.GraphqlCache,
MaxCost: s.engineExecutionConfiguration.ValidationCacheSize,
NumCounters: s.engineExecutionConfiguration.ValidationCacheSize * 10,
BufferItems: 64,
}
gm.validationCache, err = ristretto.NewCache[uint64, bool](validationCacheConfig)
if err != nil {
return nil, fmt.Errorf("failed to create validation cache: %w", err)
}
}

if s.securityConfiguration.ComplexityCalculationCache != nil && s.securityConfiguration.ComplexityCalculationCache.Enabled && s.securityConfiguration.ComplexityCalculationCache.CacheSize > 0 {
complexityCalculationCacheConfig := &ristretto.Config[uint64, ComplexityCacheEntry]{
MaxCost: s.securityConfiguration.ComplexityCalculationCache.CacheSize,
NumCounters: s.securityConfiguration.ComplexityCalculationCache.CacheSize * 10,
BufferItems: 64,
}
gm.complexityCalculationCache, err = ristretto.NewCache[uint64, ComplexityCacheEntry](complexityCalculationCacheConfig)
if err != nil {
return nil, fmt.Errorf("failed to create query depth cache: %w", err)
}
if err = gm.buildOperationCaches(s); err != nil {
return nil, err
}

if err = gm.configureCacheMetrics(s, baseOtelAttributes); err != nil {
Expand Down Expand Up @@ -554,9 +570,10 @@ func (s *graphServer) buildGraphMux(ctx context.Context,

if computeSha256 {
operationHashCacheConfig := &ristretto.Config[uint64, string]{
MaxCost: s.engineExecutionConfiguration.OperationHashCacheSize,
NumCounters: s.engineExecutionConfiguration.OperationHashCacheSize * 10,
BufferItems: 64,
MaxCost: s.engineExecutionConfiguration.OperationHashCacheSize,
NumCounters: s.engineExecutionConfiguration.OperationHashCacheSize * 10,
IgnoreInternalCost: true,
BufferItems: 64,
}
gm.operationHashCache, err = ristretto.NewCache[uint64, string](operationHashCacheConfig)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ func NewOperationsCache(cacheSize int64) (*OperationsCache, error) {

var err error
oc.Cache, err = ristretto.NewCache(&ristretto.Config[string, []byte]{
NumCounters: (cacheSize * 10) / persistentAverageCacheEntrySize,
MaxCost: cacheSize,
BufferItems: 64,
NumCounters: (cacheSize * 10) / persistentAverageCacheEntrySize,
MaxCost: cacheSize,
IgnoreInternalCost: true,
BufferItems: 64,
})
if err != nil {
return nil, fmt.Errorf("initializing operations cache: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,15 @@ type EngineExecutionConfiguration struct {
WebSocketClientPollTimeout time.Duration `envDefault:"1s" env:"ENGINE_WEBSOCKET_CLIENT_POLL_TIMEOUT" yaml:"websocket_client_poll_timeout,omitempty"`
WebSocketClientConnBufferSize int `envDefault:"128" env:"ENGINE_WEBSOCKET_CLIENT_CONN_BUFFER_SIZE" yaml:"websocket_client_conn_buffer_size,omitempty"`
WebSocketClientReadTimeout time.Duration `envDefault:"5s" env:"ENGINE_WEBSOCKET_CLIENT_READ_TIMEOUT" yaml:"websocket_client_read_timeout,omitempty"`
ExecutionPlanCacheSize int64 `envDefault:"10240" env:"ENGINE_EXECUTION_PLAN_CACHE_SIZE" yaml:"execution_plan_cache_size,omitempty"`
ExecutionPlanCacheSize int64 `envDefault:"1024" env:"ENGINE_EXECUTION_PLAN_CACHE_SIZE" yaml:"execution_plan_cache_size,omitempty"`
MinifySubgraphOperations bool `envDefault:"true" env:"ENGINE_MINIFY_SUBGRAPH_OPERATIONS" yaml:"minify_subgraph_operations"`
EnablePersistedOperationsCache bool `envDefault:"true" env:"ENGINE_ENABLE_PERSISTED_OPERATIONS_CACHE" yaml:"enable_persisted_operations_cache"`
EnableNormalizationCache bool `envDefault:"true" env:"ENGINE_ENABLE_NORMALIZATION_CACHE" yaml:"enable_normalization_cache"`
NormalizationCacheSize int64 `envDefault:"10240" env:"ENGINE_NORMALIZATION_CACHE_SIZE" yaml:"normalization_cache_size,omitempty"`
NormalizationCacheSize int64 `envDefault:"1024" env:"ENGINE_NORMALIZATION_CACHE_SIZE" yaml:"normalization_cache_size,omitempty"`
OperationHashCacheSize int64 `envDefault:"2048" env:"ENGINE_OPERATION_HASH_CACHE_SIZE" yaml:"operation_hash_cache_size,omitempty"`
ParseKitPoolSize int `envDefault:"16" env:"ENGINE_PARSEKIT_POOL_SIZE" yaml:"parsekit_pool_size,omitempty"`
EnableValidationCache bool `envDefault:"true" env:"ENGINE_ENABLE_VALIDATION_CACHE" yaml:"enable_validation_cache"`
ValidationCacheSize int64 `envDefault:"10240" env:"ENGINE_VALIDATION_CACHE_SIZE" yaml:"validation_cache_size,omitempty"`
ValidationCacheSize int64 `envDefault:"1024" env:"ENGINE_VALIDATION_CACHE_SIZE" yaml:"validation_cache_size,omitempty"`
ResolverMaxRecyclableParserSize int `envDefault:"32768" env:"ENGINE_RESOLVER_MAX_RECYCLABLE_PARSER_SIZE" yaml:"resolver_max_recyclable_parser_size,omitempty"`
EnableSubgraphFetchOperationName bool `envDefault:"true" env:"ENGINE_ENABLE_SUBGRAPH_FETCH_OPERATION_NAME" yaml:"enable_subgraph_fetch_operation_name"`
}
Expand All @@ -338,13 +338,13 @@ type SecurityConfiguration struct {
type QueryDepthConfiguration struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"SECURITY_QUERY_DEPTH_ENABLED"`
Limit int `yaml:"limit,omitempty" envDefault:"0" env:"SECURITY_QUERY_DEPTH_LIMIT"`
CacheSize int64 `yaml:"cache_size,omitempty" envDefault:"10240" env:"SECURITY_QUERY_DEPTH_CACHE_SIZE"`
CacheSize int64 `yaml:"cache_size,omitempty" envDefault:"1024" env:"SECURITY_QUERY_DEPTH_CACHE_SIZE"`
IgnorePersistedOperations bool `yaml:"ignore_persisted_operations,omitempty" envDefault:"false" env:"SECURITY_QUERY_DEPTH_IGNORE_PERSISTED_OPERATIONS"`
}

type ComplexityCalculationCache struct {
Enabled bool `yaml:"enabled" envDefault:"false" env:"SECURITY_COMPLEXITY_CACHE_ENABLED"`
CacheSize int64 `yaml:"size,omitempty" envDefault:"10240" env:"SECURITY_COMPLEXITY_CACHE_SIZE"`
CacheSize int64 `yaml:"size,omitempty" envDefault:"1024" env:"SECURITY_COMPLEXITY_CACHE_SIZE"`
}

type ComplexityLimits struct {
Expand Down
10 changes: 5 additions & 5 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1740,7 +1740,7 @@
},
"size": {
"type": "integer",
"default": 10240,
"default": 1024,
"description": "The size of the cache for the complexity calculation."
}
}
Expand Down Expand Up @@ -1863,7 +1863,7 @@
},
"cache_size": {
"type": "integer",
"default": 10240,
"default": 1024,
"description": "The size of the cache for query depth. If users set a max_query_depth, we cache the decision per query."
},
"ignore_persisted_operations": {
Expand Down Expand Up @@ -1992,7 +1992,7 @@
},
"execution_plan_cache_size": {
"type": "integer",
"default": 10240,
"default": 1024,
"description": "The size of the execution plan cache."
},
"operation_hash_cache_size": {
Expand All @@ -2017,7 +2017,7 @@
},
"normalization_cache_size": {
"type": "integer",
"default": 10240,
"default": 1024,
"description": "The size of the normalization cache."
},
"parsekit_pool_size": {
Expand All @@ -2037,7 +2037,7 @@
},
"validation_cache_size": {
"type": "integer",
"default": 10240,
"default": 1024,
"description": "The size of the validation cache."
},
"enable_subgraph_fetch_operation_name": {
Expand Down
6 changes: 3 additions & 3 deletions router/pkg/config/testdata/config_defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,15 @@
"WebSocketClientPollTimeout": 1000000000,
"WebSocketClientConnBufferSize": 128,
"WebSocketClientReadTimeout": 5000000000,
"ExecutionPlanCacheSize": 10240,
"ExecutionPlanCacheSize": 1024,
"MinifySubgraphOperations": true,
"EnablePersistedOperationsCache": true,
"EnableNormalizationCache": true,
"NormalizationCacheSize": 10240,
"NormalizationCacheSize": 1024,
"OperationHashCacheSize": 2048,
"ParseKitPoolSize": 16,
"EnableValidationCache": true,
"ValidationCacheSize": 10240,
"ValidationCacheSize": 1024,
"ResolverMaxRecyclableParserSize": 32768,
"EnableSubgraphFetchOperationName": true
},
Expand Down
4 changes: 2 additions & 2 deletions router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,11 @@
"MinifySubgraphOperations": true,
"EnablePersistedOperationsCache": true,
"EnableNormalizationCache": true,
"NormalizationCacheSize": 10240,
"NormalizationCacheSize": 1024,
"OperationHashCacheSize": 2048,
"ParseKitPoolSize": 16,
"EnableValidationCache": true,
"ValidationCacheSize": 10240,
"ValidationCacheSize": 1024,
"ResolverMaxRecyclableParserSize": 4096,
"EnableSubgraphFetchOperationName": false
},
Expand Down