diff --git a/config/semantic-cache/config.hybrid.yaml b/config/semantic-cache/config.hybrid.yaml index dc63bf8304..a04f3a7685 100644 --- a/config/semantic-cache/config.hybrid.yaml +++ b/config/semantic-cache/config.hybrid.yaml @@ -16,8 +16,110 @@ semantic_cache: hnsw_m: 16 # Number of bi-directional links hnsw_ef_construction: 200 # Construction quality parameter - # Milvus configuration file path - backend_config_path: "config/semantic-cache/milvus.yaml" + # Milvus configuration example + milvus: + # Milvus connection settings + connection: + # Milvus server host (change for production deployment) + host: "localhost" # For production: use your Milvus cluster endpoint + # Milvus server port + port: 19530 # Standard Milvus port + # Database name (optional, defaults to "default") + database: "semantic_router_cache" + # Connection timeout in seconds + timeout: 30 + # Authentication (enable for production) + auth: + enabled: false # Set to true for production + username: "" # Your Milvus username + password: "" # Your Milvus password + # TLS/SSL configuration (recommended for production) + tls: + enabled: false # Set to true for secure connections + cert_file: "" # Path to client certificate + key_file: "" # Path to client private key + ca_file: "" # Path to CA certificate + # Collection settings + collection: + # Name of the collection to store cache entries + name: "semantic_cache" + # Description of the collection + description: "Semantic cache for LLM request-response pairs" + # Vector field configuration + vector_field: + # Name of the vector field + name: "embedding" + # Dimension of the embeddings (auto-detected from model at runtime) + dimension: 384 # This value is ignored - dimension is auto-detected from the embedding model + # Metric type for similarity calculation + metric_type: "IP" # Inner Product (cosine similarity for normalized vectors) + # Index configuration for the vector field + index: + # Index type (HNSW is recommended for most use cases) + type: "HNSW" + # Index parameters + params: + M: 16 # Number of bi-directional links for each node + efConstruction: 64 # Search scope during index construction + # Search configuration + search: + # Search parameters + params: + ef: 64 # Search scope during search (should be >= topk) + # Number of top results to retrieve for similarity comparison + topk: 10 + # Consistency level for search operations + consistency_level: "Session" # Options: Strong, Session, Bounded, Eventually + # Performance and resource settings + performance: + # Connection pool settings + connection_pool: + # Maximum number of connections in the pool + max_connections: 10 + # Maximum idle connections + max_idle_connections: 5 + # Connection timeout for acquiring from pool + acquire_timeout: 5 + # Batch operation settings + batch: + # Maximum batch size for insert operations + insert_batch_size: 1000 + # Batch timeout in seconds + timeout: 30 + # Data management + data_management: + # Automatic data expiration (TTL) settings + ttl: + # Enable automatic TTL-based cleanup (requires TTL to be set in main config) + enabled: true + # Field name to store timestamp for TTL calculation + timestamp_field: "timestamp" + # Cleanup interval in seconds (how often to run cleanup) + cleanup_interval: 3600 # 1 hour + # Compaction settings + compaction: + # Enable automatic compaction + enabled: true + # Compaction interval in seconds + interval: 86400 # 24 hours + # Logging and monitoring + logging: + # Log level for Milvus client operations (debug, info, warn, error) + level: "info" + # Enable query/search logging for debugging + enable_query_log: false + # Enable performance metrics collection + enable_metrics: true + # Development and debugging settings + development: + # Drop collection on startup (WARNING: This will delete all cached data) + drop_collection_on_startup: true # Enable for development to test dynamic dimensions + # Create collection if it doesn't exist + auto_create_collection: true + # Print detailed error messages + verbose_errors: true + # (Deprecated) Or you can set up the milvus connection using the below config: + # backend_config_path: "config/semantic-cache/milvus.yaml" tools: enabled: true diff --git a/config/semantic-cache/config.redis.yaml b/config/semantic-cache/config.redis.yaml index 9db6f0f907..1b856ba96c 100644 --- a/config/semantic-cache/config.redis.yaml +++ b/config/semantic-cache/config.redis.yaml @@ -8,7 +8,72 @@ semantic_cache: backend_type: "redis" # Using Redis vector database for semantic cache similarity_threshold: 0.80 # Global threshold (lowered for better matching) ttl_seconds: 3600 - backend_config_path: "config/semantic-cache/redis.yaml" + + # Redis configuration example + redis: + # Connection Settings + connection: + # Redis server host (change for production deployment) + host: "localhost" # For production: use your Redis cluster endpoint + # Redis server port + port: 6379 # Standard Redis port + # Database number (0-15 for standard Redis) + database: 0 + # Password for authentication (leave empty if no auth required) + password: "" + # Connection timeout in seconds + timeout: 30 + # TLS/SSL configuration (recommended for production) + tls: + enabled: false # Set to true for secure connections + cert_file: "" # Path to client certificate + key_file: "" # Path to client private key + ca_file: "" # Path to CA certificate + # Index settings for vector search + index: + # Name of the search index + name: "semantic_cache_idx" + # Key prefix for documents in this index + prefix: "doc:" + # Vector field configuration + vector_field: + # Name of the vector field + name: "embedding" + # Dimension of the embeddings (auto-detected from model at runtime) + dimension: 384 # This value is ignored - dimension is auto-detected from the embedding model + # Distance metric for similarity calculation + # Options: COSINE (cosine similarity), L2 (Euclidean distance), IP (inner product) + metric_type: "COSINE" # COSINE is recommended for semantic similarity + # Index type and parameters + # Options: HNSW (Hierarchical Navigable Small World) or FLAT (brute force) + index_type: "HNSW" # HNSW is recommended for performance + # Index parameters (only used when index_type is HNSW) + params: + M: 16 # Number of bi-directional links per node (default: 16) + efConstruction: 64 # Size of dynamic candidate list during construction (default: 64) + # Search configuration + search: + # Number of top results to retrieve for similarity comparison + topk: 1 # We only need the most similar entry for cache lookup + # Logging and monitoring + logging: + # Log level for Redis client operations (debug, info, warn, error) + level: "info" + # Enable query/search logging for debugging + enable_query_log: false + # Enable performance metrics collection + enable_metrics: true + # Development and debugging settings + development: + # Drop index on startup (WARNING: This will delete all cached data) + drop_index_on_startup: true # Enable for development to test dynamic dimensions + # Create index if it doesn't exist + auto_create_index: true + # Print detailed error messages + verbose_errors: true + # (Deprecated) Or you can set up the redis connection using the below config: + # backend_config_path: "config/semantic-cache/redis.yaml" + # Embedding model for semantic similarity matching # Options: "bert" (fast, 384-dim), "qwen3" (high quality, 1024-dim, 32K context), "gemma" (balanced, 768-dim, 8K context) # Default: "bert" (fastest, lowest memory) diff --git a/src/semantic-router/pkg/cache/cache_factory.go b/src/semantic-router/pkg/cache/cache_factory.go index 36cc47a1cf..0c9b4304b6 100644 --- a/src/semantic-router/pkg/cache/cache_factory.go +++ b/src/semantic-router/pkg/cache/cache_factory.go @@ -41,38 +41,77 @@ func NewCacheBackend(config CacheConfig) (CacheBackend, error) { return NewInMemoryCache(options), nil case MilvusCacheType: - logging.Debugf("Creating Milvus cache backend - ConfigPath: %s, TTL: %ds, Threshold: %.3f", - config.BackendConfigPath, config.TTLSeconds, config.SimilarityThreshold) - options := MilvusCacheOptions{ - Enabled: config.Enabled, - SimilarityThreshold: config.SimilarityThreshold, - TTLSeconds: config.TTLSeconds, - ConfigPath: config.BackendConfigPath, + var options MilvusCacheOptions + if config.Milvus != nil { + logging.Debugf("Creating Milvus cache backend - Config: %v, TTL: %ds, Threshold: %.3f", + config.Milvus, config.TTLSeconds, config.SimilarityThreshold) + options = MilvusCacheOptions{ + Enabled: config.Enabled, + SimilarityThreshold: config.SimilarityThreshold, + TTLSeconds: config.TTLSeconds, + Config: config.Milvus, + } + } else { + logging.Debugf("(Deprecated) Creating Milvus cache backend - ConfigPath: %s, TTL: %ds, Threshold: %.3f", + config.BackendConfigPath, config.TTLSeconds, config.SimilarityThreshold) + options = MilvusCacheOptions{ + Enabled: config.Enabled, + SimilarityThreshold: config.SimilarityThreshold, + TTLSeconds: config.TTLSeconds, + ConfigPath: config.BackendConfigPath, + } } return NewMilvusCache(options) case RedisCacheType: - logging.Debugf("Creating Redis cache backend - ConfigPath: %s, TTL: %ds, Threshold: %.3f", - config.BackendConfigPath, config.TTLSeconds, config.SimilarityThreshold) - options := RedisCacheOptions{ - Enabled: config.Enabled, - SimilarityThreshold: config.SimilarityThreshold, - TTLSeconds: config.TTLSeconds, - ConfigPath: config.BackendConfigPath, + var options RedisCacheOptions + if config.Redis != nil { + logging.Debugf("Creating Redis cache backend - Config: %v, TTL: %ds, Threshold: %.3f", + config.Redis, config.TTLSeconds, config.SimilarityThreshold) + options = RedisCacheOptions{ + Enabled: config.Enabled, + SimilarityThreshold: config.SimilarityThreshold, + TTLSeconds: config.TTLSeconds, + Config: config.Redis, + } + } else { + logging.Debugf("(Deprecated) Creating Redis cache backend - ConfigPath: %s, TTL: %ds, Threshold: %.3f", + config.BackendConfigPath, config.TTLSeconds, config.SimilarityThreshold) + options = RedisCacheOptions{ + Enabled: config.Enabled, + SimilarityThreshold: config.SimilarityThreshold, + TTLSeconds: config.TTLSeconds, + ConfigPath: config.BackendConfigPath, + } } return NewRedisCache(options) case HybridCacheType: - logging.Debugf("Creating Hybrid cache backend - MaxMemory: %d, TTL: %ds, Threshold: %.3f", - config.MaxMemoryEntries, config.TTLSeconds, config.SimilarityThreshold) - options := HybridCacheOptions{ - Enabled: config.Enabled, - SimilarityThreshold: config.SimilarityThreshold, - TTLSeconds: config.TTLSeconds, - MaxMemoryEntries: config.MaxMemoryEntries, - HNSWM: config.HNSWM, - HNSWEfConstruction: config.HNSWEfConstruction, - MilvusConfigPath: config.BackendConfigPath, + var options HybridCacheOptions + if config.Milvus != nil { + logging.Debugf("Creating Hybrid cache backend - Config: %v, TTL: %ds, Threshold: %.3f", + config.Milvus, config.TTLSeconds, config.SimilarityThreshold) + options = HybridCacheOptions{ + Enabled: config.Enabled, + SimilarityThreshold: config.SimilarityThreshold, + TTLSeconds: config.TTLSeconds, + MaxMemoryEntries: config.MaxMemoryEntries, + HNSWM: config.HNSWM, + HNSWEfConstruction: config.HNSWEfConstruction, + Milvus: config.Milvus, + } + } else { + logging.Debugf("(Deprecated) Creating Hybrid cache backend - MaxMemory: %d, TTL: %ds, Threshold: %.3f", + config.MaxMemoryEntries, config.TTLSeconds, config.SimilarityThreshold) + options = HybridCacheOptions{ + Enabled: config.Enabled, + SimilarityThreshold: config.SimilarityThreshold, + TTLSeconds: config.TTLSeconds, + MaxMemoryEntries: config.MaxMemoryEntries, + HNSWM: config.HNSWM, + HNSWEfConstruction: config.HNSWEfConstruction, + MilvusConfigPath: config.BackendConfigPath, + } } return NewHybridCache(options) @@ -112,25 +151,32 @@ func ValidateCacheConfig(config CacheConfig) error { return fmt.Errorf("unsupported eviction_policy: %s", config.EvictionPolicy) } case MilvusCacheType: - if config.BackendConfigPath == "" { - return fmt.Errorf("backend_config_path is required for Milvus cache backend") - } - // Ensure the Milvus configuration file exists - if _, err := os.Stat(config.BackendConfigPath); os.IsNotExist(err) { - logging.Debugf("Milvus config file not found: %s", config.BackendConfigPath) - return fmt.Errorf("milvus config file not found: %s", config.BackendConfigPath) + if config.Milvus == nil { + logging.Debugf("Milvus configuration not provided. Using backend_config_path: %s", config.BackendConfigPath) + if config.BackendConfigPath == "" { + return fmt.Errorf("backend_config_path is required for Milvus cache backend") + } + // Ensure the Milvus configuration file exists + if _, err := os.Stat(config.BackendConfigPath); os.IsNotExist(err) { + logging.Debugf("Milvus config file not found: %s", config.BackendConfigPath) + return fmt.Errorf("milvus config file not found: %s", config.BackendConfigPath) + } + logging.Debugf("Milvus config file found: %s", config.BackendConfigPath) } - logging.Debugf("Milvus config file found: %s", config.BackendConfigPath) + logging.Debugf("Milvus configuration: %+v", config.Milvus) case RedisCacheType: - if config.BackendConfigPath == "" { - return fmt.Errorf("backend_config_path is required for Redis cache backend") - } - // Ensure the Redis configuration file exists - if _, err := os.Stat(config.BackendConfigPath); os.IsNotExist(err) { - logging.Debugf("Redis config file not found: %s", config.BackendConfigPath) - return fmt.Errorf("redis config file not found: %s", config.BackendConfigPath) + if config.Redis == nil { + logging.Debugf("Redis configuration not provided. Using backend_config_path: %s", config.BackendConfigPath) + if config.BackendConfigPath == "" { + return fmt.Errorf("backend_config_path is required for Redis cache backend") + } + // Ensure the Redis configuration file exists + if _, err := os.Stat(config.BackendConfigPath); os.IsNotExist(err) { + logging.Debugf("Redis config file not found: %s", config.BackendConfigPath) + return fmt.Errorf("redis config file not found: %s", config.BackendConfigPath) + } + logging.Debugf("Redis config file found: %s", config.BackendConfigPath) } - logging.Debugf("Redis config file found: %s", config.BackendConfigPath) } return nil diff --git a/src/semantic-router/pkg/cache/cache_interface.go b/src/semantic-router/pkg/cache/cache_interface.go index 1e66c52ce5..4a8c0be8e0 100644 --- a/src/semantic-router/pkg/cache/cache_interface.go +++ b/src/semantic-router/pkg/cache/cache_interface.go @@ -1,6 +1,10 @@ package cache -import "time" +import ( + "time" + + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" +) // CacheEntry represents a complete cached request-response pair with associated metadata type CacheEntry struct { @@ -112,7 +116,13 @@ type CacheConfig struct { // EvictionPolicy defines the eviction policy for in-memory cache ("fifo", "lru", "lfu") EvictionPolicy EvictionPolicyType `yaml:"eviction_policy,omitempty"` - // BackendConfigPath points to backend-specific configuration files + // Redis specific settings + Redis *config.RedisConfig `yaml:"redis,omitempty"` + + // Milvus specific settings + Milvus *config.MilvusConfig `yaml:"milvus,omitempty"` + + // BackendConfigPath points to backend-specific configuration files (Deprecated) BackendConfigPath string `yaml:"backend_config_path,omitempty"` // UseHNSW enables HNSW index for faster search in memory backend diff --git a/src/semantic-router/pkg/cache/cache_test.go b/src/semantic-router/pkg/cache/cache_test.go index 0d17cd76c3..3619655714 100644 --- a/src/semantic-router/pkg/cache/cache_test.go +++ b/src/semantic-router/pkg/cache/cache_test.go @@ -18,8 +18,10 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus/testutil" + "gopkg.in/yaml.v3" candle_binding "github.com/vllm-project/semantic-router/candle-binding" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" ) @@ -99,7 +101,7 @@ var _ = Describe("Cache Package", func() { }) }) - Context("with Milvus backend", func() { + Context("(Deprecated) with file base Milvus backend", func() { var milvusConfigPath string BeforeEach(func() { @@ -144,7 +146,7 @@ development: Expect(err).NotTo(HaveOccurred()) }) - It("should create Milvus cache backend successfully with valid config", func() { + It("should create Milvus cache backend successfully with valid config (Deprecated)", func() { config := CacheConfig{ BackendType: MilvusCacheType, Enabled: true, @@ -172,7 +174,7 @@ development: } }) - It("should handle disabled Milvus cache", func() { + It("should handle disabled Milvus cache (Deprecated)", func() { config := CacheConfig{ BackendType: MilvusCacheType, Enabled: false, @@ -189,7 +191,75 @@ development: }) }) - Context("with Redis backend", func() { + Context("with inline Milvus configuration", func() { + var milvusConfig *config.MilvusConfig + BeforeEach(func() { + // Skip Milvus tests if environment variable is set + if os.Getenv("SKIP_MILVUS_TESTS") == "true" { + Skip("Milvus tests skipped due to SKIP_MILVUS_TESTS=true") + } + + yamlConfig := ` +connection: + host: "localhost" + port: 19530 + database: "test_cache" + timeout: 30 +collection: + name: "test_semantic_cache" + description: "Test semantic cache collection" + vector_field: + name: "embedding" + dimension: 512 + metric_type: "IP" + index: + type: "HNSW" + params: + M: 16 + efConstruction: 64 +search: + params: + ef: 64 + topk: 10 + consistency_level: "Session" +development: + auto_create_collection: true + verbose_errors: true +` + err := yaml.Unmarshal([]byte(yamlConfig), &milvusConfig) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should create Milvus cache backend successfully with valid config", func() { + config := CacheConfig{ + BackendType: MilvusCacheType, + Enabled: true, + SimilarityThreshold: 0.85, + TTLSeconds: 7200, + Milvus: milvusConfig, + EmbeddingModel: "bert", + } + + backend, err := NewCacheBackend(config) + + // Skip test if Milvus is not reachable + if err != nil { + if strings.Contains(err.Error(), "failed to create Milvus client") || + strings.Contains(err.Error(), "connection") || + strings.Contains(err.Error(), "dial") { + Skip("Milvus server not available: " + err.Error()) + } + // For other errors, fail the test + Expect(err).NotTo(HaveOccurred()) + } else { + // If Milvus is available, creation should succeed + Expect(backend).NotTo(BeNil()) + Expect(backend.IsEnabled()).To(BeTrue()) + } + }) + }) + + Context("(Deprecated) with Redis backend", func() { var redisConfigPath string BeforeEach(func() { @@ -233,7 +303,7 @@ development: Expect(err).NotTo(HaveOccurred()) }) - It("should create Redis cache backend successfully with valid config", func() { + It("should create Redis cache backend successfully with valid config (Deprecated)", func() { config := CacheConfig{ BackendType: RedisCacheType, Enabled: true, @@ -259,7 +329,7 @@ development: } }) - It("should handle disabled Redis cache", func() { + It("should handle disabled Redis cache (Deprecated)", func() { config := CacheConfig{ BackendType: RedisCacheType, Enabled: false, @@ -277,6 +347,79 @@ development: }) }) + Context("with inline Redis configuration", func() { + var redisConfig *config.RedisConfig + // Skip Milvus tests if environment variable is set + if os.Getenv("SKIP_REDIS_TESTS") == "true" { + Skip("Redis tests skipped due to SKIP_REDIS_TESTS=true") + } + + BeforeEach(func() { + yamlConfig := ` +connection: + host: "localhost" + port: 6379 + database: 0 + password: "" + timeout: 30 + tls: + enabled: false + cert_file: "" + key_file: "" + ca_file: "" +index: + name: "semantic_cache_idx" + prefix: "doc:" + vector_field: + name: "embedding" + dimension: 384 + metric_type: "COSINE" + index_type: "HNSW" + params: + M: 16 + efConstruction: 64 +search: + topk: 1 +logging: + level: "info" + enable_query_log: false + enable_metrics: true +development: + drop_index_on_startup: true + auto_create_index: true + verbose_errors: true +` + err := yaml.Unmarshal([]byte(yamlConfig), &redisConfig) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should create Redis cache backend successfully with valid config", func() { + config := CacheConfig{ + BackendType: RedisCacheType, + Enabled: true, + SimilarityThreshold: 0.8, + TTLSeconds: 3600, + Redis: redisConfig, + EmbeddingModel: "bert", + } + + backend, err := NewCacheBackend(config) + + if err != nil { + if strings.Contains(err.Error(), "failed to connect to Redis") || + strings.Contains(err.Error(), "connection refused") || + strings.Contains(err.Error(), "failed to initialize index") { + Skip("Redis server not available: " + err.Error()) + } + Expect(err).NotTo(HaveOccurred()) + } else { + Expect(backend).NotTo(BeNil()) + Expect(backend.IsEnabled()).To(BeTrue()) + Expect(backend.Close()).To(Succeed()) + } + }) + }) + Context("Milvus connection timeouts", func() { It("should respect connection timeout when endpoint is unreachable", func() { unreachableConfigPath := filepath.Join(tempDir, "milvus-unreachable.yaml") diff --git a/src/semantic-router/pkg/cache/hybrid_cache.go b/src/semantic-router/pkg/cache/hybrid_cache.go index ba8bdc5e68..e48b41a82f 100644 --- a/src/semantic-router/pkg/cache/hybrid_cache.go +++ b/src/semantic-router/pkg/cache/hybrid_cache.go @@ -10,6 +10,7 @@ import ( "time" candle_binding "github.com/vllm-project/semantic-router/candle-binding" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" ) @@ -104,6 +105,9 @@ type HybridCacheOptions struct { HNSWEfConstruction int // HNSW efConstruction parameter // Milvus settings + Milvus *config.MilvusConfig + + // (Deprecated) Milvus settings configuration path MilvusConfigPath string // Startup settings @@ -123,11 +127,21 @@ func NewHybridCache(options HybridCacheOptions) (*HybridCache, error) { } // Initialize Milvus backend - milvusOptions := MilvusCacheOptions{ - Enabled: true, - SimilarityThreshold: options.SimilarityThreshold, - TTLSeconds: options.TTLSeconds, - ConfigPath: options.MilvusConfigPath, + var milvusOptions MilvusCacheOptions + if options.Milvus != nil { + milvusOptions = MilvusCacheOptions{ + Enabled: true, + SimilarityThreshold: options.SimilarityThreshold, + TTLSeconds: options.TTLSeconds, + Config: options.Milvus, + } + } else { + milvusOptions = MilvusCacheOptions{ + Enabled: true, + SimilarityThreshold: options.SimilarityThreshold, + TTLSeconds: options.TTLSeconds, + ConfigPath: options.MilvusConfigPath, + } } milvusCache, err := NewMilvusCache(milvusOptions) diff --git a/src/semantic-router/pkg/cache/hybrid_cache_stub.go b/src/semantic-router/pkg/cache/hybrid_cache_stub.go index 5421d9f0f0..0d4bd9ae55 100644 --- a/src/semantic-router/pkg/cache/hybrid_cache_stub.go +++ b/src/semantic-router/pkg/cache/hybrid_cache_stub.go @@ -4,6 +4,8 @@ package cache import ( "context" + + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" ) // HybridCache combines in-memory HNSW index with external Milvus storage @@ -19,6 +21,7 @@ type HybridCacheOptions struct { MaxMemoryEntries int HNSWM int HNSWEfConstruction int + Milvus *config.MilvusConfig MilvusConfigPath string DisableRebuildOnStartup bool } diff --git a/src/semantic-router/pkg/cache/milvus_cache.go b/src/semantic-router/pkg/cache/milvus_cache.go index 6720bfaf40..c9fc020a32 100644 --- a/src/semantic-router/pkg/cache/milvus_cache.go +++ b/src/semantic-router/pkg/cache/milvus_cache.go @@ -14,92 +14,15 @@ import ( "sigs.k8s.io/yaml" candle_binding "github.com/vllm-project/semantic-router/candle-binding" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" ) -// MilvusConfig defines the complete configuration structure for Milvus cache backend. -// Fields use both json/yaml tags because sigs.k8s.io/yaml converts YAML→JSON before decoding, -// so json tags ensure snake_case keys map correctly without switching parsers. -type MilvusConfig struct { - Connection struct { - Host string `json:"host" yaml:"host"` - Port int `json:"port" yaml:"port"` - Database string `json:"database" yaml:"database"` - Timeout int `json:"timeout" yaml:"timeout"` - Auth struct { - Enabled bool `json:"enabled" yaml:"enabled"` - Username string `json:"username" yaml:"username"` - Password string `json:"password" yaml:"password"` - } `json:"auth" yaml:"auth"` - TLS struct { - Enabled bool `json:"enabled" yaml:"enabled"` - CertFile string `json:"cert_file" yaml:"cert_file"` - KeyFile string `json:"key_file" yaml:"key_file"` - CAFile string `json:"ca_file" yaml:"ca_file"` - } `json:"tls" yaml:"tls"` - } `json:"connection" yaml:"connection"` - Collection struct { - Name string `json:"name" yaml:"name"` - Description string `json:"description" yaml:"description"` - VectorField struct { - Name string `json:"name" yaml:"name"` - Dimension int `json:"dimension" yaml:"dimension"` - MetricType string `json:"metric_type" yaml:"metric_type"` - } `json:"vector_field" yaml:"vector_field"` - Index struct { - Type string `json:"type" yaml:"type"` - Params struct { - M int `json:"M" yaml:"M"` - EfConstruction int `json:"efConstruction" yaml:"efConstruction"` - } `json:"params" yaml:"params"` - } `json:"index" yaml:"index"` - } `json:"collection" yaml:"collection"` - Search struct { - Params struct { - Ef int `json:"ef" yaml:"ef"` - } `json:"params" yaml:"params"` - TopK int `json:"topk" yaml:"topk"` - ConsistencyLevel string `json:"consistency_level" yaml:"consistency_level"` - } `json:"search" yaml:"search"` - Performance struct { - ConnectionPool struct { - MaxConnections int `json:"max_connections" yaml:"max_connections"` - MaxIdleConnections int `json:"max_idle_connections" yaml:"max_idle_connections"` - AcquireTimeout int `json:"acquire_timeout" yaml:"acquire_timeout"` - } `json:"connection_pool" yaml:"connection_pool"` - Batch struct { - InsertBatchSize int `json:"insert_batch_size" yaml:"insert_batch_size"` - Timeout int `json:"timeout" yaml:"timeout"` - } `json:"batch" yaml:"batch"` - } `json:"performance" yaml:"performance"` - DataManagement struct { - TTL struct { - Enabled bool `json:"enabled" yaml:"enabled"` - TimestampField string `json:"timestamp_field" yaml:"timestamp_field"` - CleanupInterval int `json:"cleanup_interval" yaml:"cleanup_interval"` - } `json:"ttl" yaml:"ttl"` - Compaction struct { - Enabled bool `json:"enabled" yaml:"enabled"` - Interval int `json:"interval" yaml:"interval"` - } `json:"compaction" yaml:"compaction"` - } `json:"data_management" yaml:"data_management"` - Logging struct { - Level string `json:"level" yaml:"level"` - EnableQueryLog bool `json:"enable_query_log" yaml:"enable_query_log"` - EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"` - } `json:"logging" yaml:"logging"` - Development struct { - DropCollectionOnStartup bool `json:"drop_collection_on_startup" yaml:"drop_collection_on_startup"` - AutoCreateCollection bool `json:"auto_create_collection" yaml:"auto_create_collection"` - VerboseErrors bool `json:"verbose_errors" yaml:"verbose_errors"` - } `json:"development" yaml:"development"` -} - // MilvusCache provides a scalable semantic cache implementation using Milvus vector database type MilvusCache struct { client client.Client - config *MilvusConfig + config *config.MilvusConfig collectionName string similarityThreshold float32 ttlSeconds int @@ -115,6 +38,7 @@ type MilvusCacheOptions struct { SimilarityThreshold float32 TTLSeconds int Enabled bool + Config *config.MilvusConfig ConfigPath string } @@ -127,24 +51,30 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) { }, nil } - // Load Milvus configuration from file - logging.Debugf("MilvusCache: loading config from %s", options.ConfigPath) - config, err := loadMilvusConfig(options.ConfigPath) - if err != nil { - logging.Debugf("MilvusCache: failed to load config: %v", err) - return nil, fmt.Errorf("failed to load Milvus config: %w", err) + // (Fallback) Load Milvus configuration from a separated configuration file + var err error + var milvusConfig *config.MilvusConfig + if options.Config == nil { + logging.Warnf("(Deprecated) MilvusCache: loading config from %s", options.ConfigPath) + milvusConfig, err = loadMilvusConfig(options.ConfigPath) + if err != nil { + logging.Debugf("MilvusCache: failed to load config: %v", err) + return nil, fmt.Errorf("failed to load Milvus config: %w", err) + } + } else { + milvusConfig = options.Config } logging.Debugf("MilvusCache: config loaded - host=%s:%d, collection=%s, dimension=auto-detect", - config.Connection.Host, config.Connection.Port, config.Collection.Name) + milvusConfig.Connection.Host, milvusConfig.Connection.Port, milvusConfig.Collection.Name) // Establish connection to Milvus server - connectionString := fmt.Sprintf("%s:%d", config.Connection.Host, config.Connection.Port) + connectionString := fmt.Sprintf("%s:%d", milvusConfig.Connection.Host, milvusConfig.Connection.Port) logging.Debugf("MilvusCache: connecting to Milvus at %s", connectionString) dialCtx := context.Background() var cancel context.CancelFunc - if config.Connection.Timeout > 0 { + if milvusConfig.Connection.Timeout > 0 { // If a timeout is specified, apply it to the connection context - timeout := time.Duration(config.Connection.Timeout) * time.Second + timeout := time.Duration(milvusConfig.Connection.Timeout) * time.Second dialCtx, cancel = context.WithTimeout(dialCtx, timeout) defer cancel() logging.Debugf("MilvusCache: connection timeout set to %s", timeout) @@ -157,8 +87,8 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) { cache := &MilvusCache{ client: milvusClient, - config: config, - collectionName: config.Collection.Name, + config: milvusConfig, + collectionName: milvusConfig.Collection.Name, similarityThreshold: options.SimilarityThreshold, ttlSeconds: options.TTLSeconds, enabled: options.Enabled, @@ -173,7 +103,7 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) { logging.Debugf("MilvusCache: successfully connected to Milvus") // Set up the collection for caching - logging.Debugf("MilvusCache: initializing collection '%s'", config.Collection.Name) + logging.Debugf("MilvusCache: initializing collection '%s'", milvusConfig.Collection.Name) if err := cache.initializeCollection(); err != nil { logging.Debugf("MilvusCache: failed to initialize collection: %v", err) milvusClient.Close() @@ -184,35 +114,35 @@ func NewMilvusCache(options MilvusCacheOptions) (*MilvusCache, error) { return cache, nil } -// loadMilvusConfig reads and parses the Milvus configuration from file -func loadMilvusConfig(configPath string) (*MilvusConfig, error) { +// loadMilvusConfig reads and parses the Milvus configuration from file (Deprecated) +func loadMilvusConfig(configPath string) (*config.MilvusConfig, error) { if configPath == "" { return nil, fmt.Errorf("milvus config path is required") } - fmt.Printf("[DEBUG] Loading Milvus config from: %s\n", configPath) + logging.Debugf("Loading Milvus config from: %s\n", configPath) data, err := os.ReadFile(configPath) if err != nil { return nil, fmt.Errorf("failed to read config file: %w", err) } - fmt.Printf("[DEBUG] Config file size: %d bytes\n", len(data)) + logging.Debugf("Config file size: %d bytes\n", len(data)) - var config MilvusConfig - if err := yaml.Unmarshal(data, &config); err != nil { + var milvusConfig *config.MilvusConfig + if err = yaml.Unmarshal(data, &milvusConfig); err != nil { return nil, fmt.Errorf("failed to parse config file: %w", err) } // Debug: Log what was parsed - fmt.Printf("[DEBUG] MilvusConfig parsed from %s:\n", configPath) - fmt.Printf("[DEBUG] Collection.Name: %s\n", config.Collection.Name) - fmt.Printf("[DEBUG] Collection.VectorField.Name: %s\n", config.Collection.VectorField.Name) - fmt.Printf("[DEBUG] Collection.VectorField.Dimension: %d\n", config.Collection.VectorField.Dimension) - fmt.Printf("[DEBUG] Collection.VectorField.MetricType: %s\n", config.Collection.VectorField.MetricType) - fmt.Printf("[DEBUG] Collection.Index.Type: %s\n", config.Collection.Index.Type) - fmt.Printf("[DEBUG] Development.AutoCreateCollection: %v\n", config.Development.AutoCreateCollection) - fmt.Printf("[DEBUG] Development.DropCollectionOnStartup: %v\n", config.Development.DropCollectionOnStartup) + logging.Debugf("MilvusConfig parsed from %s:\n", configPath) + logging.Debugf("Collection.Name: %s\n", milvusConfig.Collection.Name) + logging.Debugf("Collection.VectorField.Name: %s\n", milvusConfig.Collection.VectorField.Name) + logging.Debugf("Collection.VectorField.Dimension: %d\n", milvusConfig.Collection.VectorField.Dimension) + logging.Debugf("Collection.VectorField.MetricType: %s\n", milvusConfig.Collection.VectorField.MetricType) + logging.Debugf("Collection.Index.Type: %s\n", milvusConfig.Collection.Index.Type) + logging.Debugf("Development.AutoCreateCollection: %v\n", milvusConfig.Development.AutoCreateCollection) + logging.Debugf("Development.DropCollectionOnStartup: %v\n", milvusConfig.Development.DropCollectionOnStartup) // WORKAROUND: Force development settings for benchmarks/tests only // There seems to be a YAML parsing issue with sigs.k8s.io/yaml @@ -220,41 +150,41 @@ func loadMilvusConfig(configPath string) (*MilvusConfig, error) { benchmarkMode := os.Getenv("SR_BENCHMARK_MODE") testMode := os.Getenv("SR_TEST_MODE") if (benchmarkMode == "1" || benchmarkMode == "true" || testMode == "1" || testMode == "true") && - !config.Development.AutoCreateCollection && !config.Development.DropCollectionOnStartup { - fmt.Printf("[WARN] Development settings parsed as false, forcing to true for benchmarks/tests\n") - config.Development.AutoCreateCollection = true - config.Development.DropCollectionOnStartup = true + !milvusConfig.Development.AutoCreateCollection && !milvusConfig.Development.DropCollectionOnStartup { + logging.Warnf("Development settings parsed as false, forcing to true for benchmarks/tests\n") + milvusConfig.Development.AutoCreateCollection = true + milvusConfig.Development.DropCollectionOnStartup = true } // WORKAROUND: Force vector field settings if empty - if config.Collection.VectorField.Name == "" { - fmt.Printf("[WARN] VectorField.Name parsed as empty, setting to 'embedding'\n") - config.Collection.VectorField.Name = "embedding" + if milvusConfig.Collection.VectorField.Name == "" { + logging.Warnf("VectorField.Name parsed as empty, setting to 'embedding'\n") + milvusConfig.Collection.VectorField.Name = "embedding" } - if config.Collection.VectorField.MetricType == "" { - fmt.Printf("[WARN] VectorField.MetricType parsed as empty, setting to 'IP'\n") - config.Collection.VectorField.MetricType = "IP" + if milvusConfig.Collection.VectorField.MetricType == "" { + logging.Warnf("VectorField.MetricType parsed as empty, setting to 'IP'\n") + milvusConfig.Collection.VectorField.MetricType = "IP" } - if config.Collection.Index.Type == "" { - fmt.Printf("[WARN] Index.Type parsed as empty, setting to 'HNSW'\n") - config.Collection.Index.Type = "HNSW" + if milvusConfig.Collection.Index.Type == "" { + logging.Warnf("Index.Type parsed as empty, setting to 'HNSW'\n") + milvusConfig.Collection.Index.Type = "HNSW" } // Validate index params - if config.Collection.Index.Params.M == 0 { - fmt.Printf("[WARN] Index.Params.M parsed as 0, setting to 16\n") - config.Collection.Index.Params.M = 16 + if milvusConfig.Collection.Index.Params.M == 0 { + logging.Warnf("Index.Params.M parsed as 0, setting to 16\n") + milvusConfig.Collection.Index.Params.M = 16 } - if config.Collection.Index.Params.EfConstruction == 0 { - fmt.Printf("[WARN] Index.Params.EfConstruction parsed as 0, setting to 64\n") - config.Collection.Index.Params.EfConstruction = 64 + if milvusConfig.Collection.Index.Params.EfConstruction == 0 { + logging.Warnf("Index.Params.EfConstruction parsed as 0, setting to 64\n") + milvusConfig.Collection.Index.Params.EfConstruction = 64 } // Validate search params - if config.Search.Params.Ef == 0 { - fmt.Printf("[WARN] Search.Params.Ef parsed as 0, setting to 64\n") - config.Search.Params.Ef = 64 + if milvusConfig.Search.Params.Ef == 0 { + logging.Warnf("Search.Params.Ef parsed as 0, setting to 64\n") + milvusConfig.Search.Params.Ef = 64 } - return &config, nil + return milvusConfig, nil } // initializeCollection sets up the Milvus collection and index structures diff --git a/src/semantic-router/pkg/cache/redis_cache.go b/src/semantic-router/pkg/cache/redis_cache.go index d83f328ee2..f037c1a39c 100644 --- a/src/semantic-router/pkg/cache/redis_cache.go +++ b/src/semantic-router/pkg/cache/redis_cache.go @@ -16,58 +16,15 @@ import ( "sigs.k8s.io/yaml" candle_binding "github.com/vllm-project/semantic-router/candle-binding" + "github.com/vllm-project/semantic-router/src/semantic-router/pkg/config" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/logging" "github.com/vllm-project/semantic-router/src/semantic-router/pkg/observability/metrics" ) -// RedisConfig defines the complete configuration structure for Redis cache backend. -type RedisConfig struct { - Connection struct { - Host string `json:"host" yaml:"host"` - Port int `json:"port" yaml:"port"` - Database int `json:"database" yaml:"database"` - Password string `json:"password" yaml:"password"` - Timeout int `json:"timeout" yaml:"timeout"` - TLS struct { - Enabled bool `json:"enabled" yaml:"enabled"` - CertFile string `json:"cert_file" yaml:"cert_file"` - KeyFile string `json:"key_file" yaml:"key_file"` - CAFile string `json:"ca_file" yaml:"ca_file"` - } `json:"tls" yaml:"tls"` - } `json:"connection" yaml:"connection"` - Index struct { - Name string `json:"name" yaml:"name"` - Prefix string `json:"prefix" yaml:"prefix"` - VectorField struct { - Name string `json:"name" yaml:"name"` - Dimension int `json:"dimension" yaml:"dimension"` - MetricType string `json:"metric_type" yaml:"metric_type"` // L2, IP, COSINE - } `json:"vector_field" yaml:"vector_field"` - IndexType string `json:"index_type" yaml:"index_type"` // HNSW or FLAT - Params struct { - M int `json:"M" yaml:"M"` - EfConstruction int `json:"efConstruction" yaml:"efConstruction"` - } `json:"params" yaml:"params"` - } `json:"index" yaml:"index"` - Search struct { - TopK int `json:"topk" yaml:"topk"` - } `json:"search" yaml:"search"` - Development struct { - DropIndexOnStartup bool `json:"drop_index_on_startup" yaml:"drop_index_on_startup"` - AutoCreateIndex bool `json:"auto_create_index" yaml:"auto_create_index"` - VerboseErrors bool `json:"verbose_errors" yaml:"verbose_errors"` - } `json:"development" yaml:"development"` - Logging struct { - Level string `json:"level" yaml:"level"` - EnableQueryLog bool `json:"enable_query_log" yaml:"enable_query_log"` - EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"` - } `json:"logging" yaml:"logging"` -} - // RedisCache provides a scalable semantic cache implementation using Redis with vector search type RedisCache struct { client *redis.Client - config *RedisConfig + config *config.RedisConfig indexName string similarityThreshold float32 ttlSeconds int @@ -83,6 +40,7 @@ type RedisCacheOptions struct { SimilarityThreshold float32 TTLSeconds int Enabled bool + Config *config.RedisConfig ConfigPath string } @@ -95,30 +53,36 @@ func NewRedisCache(options RedisCacheOptions) (*RedisCache, error) { }, nil } - // Load Redis configuration from file - logging.Debugf("RedisCache: loading config from %s", options.ConfigPath) - config, err := loadRedisConfig(options.ConfigPath) - if err != nil { - logging.Debugf("RedisCache: failed to load config: %v", err) - return nil, fmt.Errorf("failed to load Redis config: %w", err) + // (Fallback) Load Redis configuration from a separated configuration file + var err error + var redisConfig *config.RedisConfig + if options.Config == nil { + logging.Warnf("(Deprecated) RedisCache: loading config from %s", options.ConfigPath) + redisConfig, err = loadRedisConfig(options.ConfigPath) + if err != nil { + logging.Debugf("RedisCache: failed to load config: %v", err) + return nil, fmt.Errorf("failed to load Redis config: %w", err) + } + } else { + redisConfig = options.Config } logging.Debugf("RedisCache: config loaded - host=%s:%d, index=%s, dimension=auto-detect", - config.Connection.Host, config.Connection.Port, config.Index.Name) + redisConfig.Connection.Host, redisConfig.Connection.Port, redisConfig.Index.Name) // Establish connection to Redis server - logging.Debugf("RedisCache: connecting to Redis at %s:%d", config.Connection.Host, config.Connection.Port) + logging.Debugf("RedisCache: connecting to Redis at %s:%d", redisConfig.Connection.Host, redisConfig.Connection.Port) redisClient := redis.NewClient(&redis.Options{ - Addr: fmt.Sprintf("%s:%d", config.Connection.Host, config.Connection.Port), - Password: config.Connection.Password, - DB: config.Connection.Database, + Addr: fmt.Sprintf("%s:%d", redisConfig.Connection.Host, redisConfig.Connection.Port), + Password: redisConfig.Connection.Password, + DB: redisConfig.Connection.Database, Protocol: 2, // Use RESP2 protocol for compatibility }) cache := &RedisCache{ client: redisClient, - config: config, - indexName: config.Index.Name, + config: redisConfig, + indexName: redisConfig.Index.Name, similarityThreshold: options.SimilarityThreshold, ttlSeconds: options.TTLSeconds, enabled: options.Enabled, @@ -132,7 +96,7 @@ func NewRedisCache(options RedisCacheOptions) (*RedisCache, error) { logging.Debugf("RedisCache: successfully connected to Redis") // Set up the index for vector search - logging.Debugf("RedisCache: initializing index '%s'", config.Index.Name) + logging.Debugf("RedisCache: initializing index '%s'", redisConfig.Index.Name) if err := cache.initializeIndex(); err != nil { logging.Debugf("RedisCache: failed to initialize index: %v", err) redisClient.Close() @@ -143,8 +107,8 @@ func NewRedisCache(options RedisCacheOptions) (*RedisCache, error) { return cache, nil } -// loadRedisConfig reads and parses the Redis configuration from file -func loadRedisConfig(configPath string) (*RedisConfig, error) { +// loadRedisConfig reads and parses the Redis configuration from a file (Deprecated) +func loadRedisConfig(configPath string) (*config.RedisConfig, error) { if configPath == "" { return nil, fmt.Errorf("redis config path is required") } @@ -156,42 +120,42 @@ func loadRedisConfig(configPath string) (*RedisConfig, error) { return nil, fmt.Errorf("failed to read config file: %w", err) } - var config RedisConfig - if err := yaml.Unmarshal(data, &config); err != nil { + var redisConfig *config.RedisConfig + if err := yaml.Unmarshal(data, &redisConfig); err != nil { return nil, fmt.Errorf("failed to parse config file: %w", err) } logging.Debugf("Redis config loaded: index=%s, dimension=%d, metric=%s", - config.Index.Name, config.Index.VectorField.Dimension, config.Index.VectorField.MetricType) + redisConfig.Index.Name, redisConfig.Index.VectorField.Dimension, redisConfig.Index.VectorField.MetricType) // Apply defaults - if config.Index.VectorField.Name == "" { - config.Index.VectorField.Name = "embedding" + if redisConfig.Index.VectorField.Name == "" { + redisConfig.Index.VectorField.Name = "embedding" logging.Warnf("VectorField.Name not specified, using default: embedding") } - if config.Index.VectorField.MetricType == "" { - config.Index.VectorField.MetricType = "COSINE" + if redisConfig.Index.VectorField.MetricType == "" { + redisConfig.Index.VectorField.MetricType = "COSINE" } - if config.Index.IndexType == "" { - config.Index.IndexType = "HNSW" + if redisConfig.Index.IndexType == "" { + redisConfig.Index.IndexType = "HNSW" } - if config.Index.Prefix == "" { - config.Index.Prefix = "doc:" + if redisConfig.Index.Prefix == "" { + redisConfig.Index.Prefix = "doc:" } // Validate index params for HNSW - if config.Index.IndexType == "HNSW" { - if config.Index.Params.M == 0 { - config.Index.Params.M = 16 + if redisConfig.Index.IndexType == "HNSW" { + if redisConfig.Index.Params.M == 0 { + redisConfig.Index.Params.M = 16 } - if config.Index.Params.EfConstruction == 0 { - config.Index.Params.EfConstruction = 64 + if redisConfig.Index.Params.EfConstruction == 0 { + redisConfig.Index.Params.EfConstruction = 64 } } - if config.Search.TopK == 0 { - config.Search.TopK = 1 + if redisConfig.Search.TopK == 0 { + redisConfig.Search.TopK = 1 } - return &config, nil + return redisConfig, nil } // initializeIndex sets up the Redis index for vector search diff --git a/src/semantic-router/pkg/config/config.go b/src/semantic-router/pkg/config/config.go index 5dadcb0443..6c00c763cb 100644 --- a/src/semantic-router/pkg/config/config.go +++ b/src/semantic-router/pkg/config/config.go @@ -457,6 +457,128 @@ func (l *LooperConfig) GetTimeout() int { return l.TimeoutSeconds } +// RedisConfig defines the complete configuration structure for Redis cache backend. +type RedisConfig struct { + Connection struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port" yaml:"port"` + Database int `json:"database" yaml:"database"` + Password string `json:"password" yaml:"password"` + Timeout int `json:"timeout" yaml:"timeout"` + TLS struct { + Enabled bool `json:"enabled" yaml:"enabled"` + CertFile string `json:"cert_file" yaml:"cert_file"` + KeyFile string `json:"key_file" yaml:"key_file"` + CAFile string `json:"ca_file" yaml:"ca_file"` + } `json:"tls" yaml:"tls"` + } `json:"connection" yaml:"connection"` + Index struct { + Name string `json:"name" yaml:"name"` + Prefix string `json:"prefix" yaml:"prefix"` + VectorField struct { + Name string `json:"name" yaml:"name"` + Dimension int `json:"dimension" yaml:"dimension"` + MetricType string `json:"metric_type" yaml:"metric_type"` // L2, IP, COSINE + } `json:"vector_field" yaml:"vector_field"` + IndexType string `json:"index_type" yaml:"index_type"` // HNSW or FLAT + Params struct { + M int `json:"M" yaml:"M"` + EfConstruction int `json:"efConstruction" yaml:"efConstruction"` + } `json:"params" yaml:"params"` + } `json:"index" yaml:"index"` + Search struct { + TopK int `json:"topk" yaml:"topk"` + } `json:"search" yaml:"search"` + Development struct { + DropIndexOnStartup bool `json:"drop_index_on_startup" yaml:"drop_index_on_startup"` + AutoCreateIndex bool `json:"auto_create_index" yaml:"auto_create_index"` + VerboseErrors bool `json:"verbose_errors" yaml:"verbose_errors"` + } `json:"development" yaml:"development"` + Logging struct { + Level string `json:"level" yaml:"level"` + EnableQueryLog bool `json:"enable_query_log" yaml:"enable_query_log"` + EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"` + } `json:"logging" yaml:"logging"` +} + +// MilvusConfig defines the complete configuration structure for Milvus cache backend. +// Fields use both json/yaml tags because sigs.k8s.io/yaml converts YAML→JSON before decoding, +// so json tags ensure snake_case keys map correctly without switching parsers. +type MilvusConfig struct { + Connection struct { + Host string `json:"host" yaml:"host"` + Port int `json:"port" yaml:"port"` + Database string `json:"database" yaml:"database"` + Timeout int `json:"timeout" yaml:"timeout"` + Auth struct { + Enabled bool `json:"enabled" yaml:"enabled"` + Username string `json:"username" yaml:"username"` + Password string `json:"password" yaml:"password"` + } `json:"auth" yaml:"auth"` + TLS struct { + Enabled bool `json:"enabled" yaml:"enabled"` + CertFile string `json:"cert_file" yaml:"cert_file"` + KeyFile string `json:"key_file" yaml:"key_file"` + CAFile string `json:"ca_file" yaml:"ca_file"` + } `json:"tls" yaml:"tls"` + } `json:"connection" yaml:"connection"` + Collection struct { + Name string `json:"name" yaml:"name"` + Description string `json:"description" yaml:"description"` + VectorField struct { + Name string `json:"name" yaml:"name"` + Dimension int `json:"dimension" yaml:"dimension"` + MetricType string `json:"metric_type" yaml:"metric_type"` + } `json:"vector_field" yaml:"vector_field"` + Index struct { + Type string `json:"type" yaml:"type"` + Params struct { + M int `json:"M" yaml:"M"` + EfConstruction int `json:"efConstruction" yaml:"efConstruction"` + } `json:"params" yaml:"params"` + } `json:"index" yaml:"index"` + } `json:"collection" yaml:"collection"` + Search struct { + Params struct { + Ef int `json:"ef" yaml:"ef"` + } `json:"params" yaml:"params"` + TopK int `json:"topk" yaml:"topk"` + ConsistencyLevel string `json:"consistency_level" yaml:"consistency_level"` + } `json:"search" yaml:"search"` + Performance struct { + ConnectionPool struct { + MaxConnections int `json:"max_connections" yaml:"max_connections"` + MaxIdleConnections int `json:"max_idle_connections" yaml:"max_idle_connections"` + AcquireTimeout int `json:"acquire_timeout" yaml:"acquire_timeout"` + } `json:"connection_pool" yaml:"connection_pool"` + Batch struct { + InsertBatchSize int `json:"insert_batch_size" yaml:"insert_batch_size"` + Timeout int `json:"timeout" yaml:"timeout"` + } `json:"batch" yaml:"batch"` + } `json:"performance" yaml:"performance"` + DataManagement struct { + TTL struct { + Enabled bool `json:"enabled" yaml:"enabled"` + TimestampField string `json:"timestamp_field" yaml:"timestamp_field"` + CleanupInterval int `json:"cleanup_interval" yaml:"cleanup_interval"` + } `json:"ttl" yaml:"ttl"` + Compaction struct { + Enabled bool `json:"enabled" yaml:"enabled"` + Interval int `json:"interval" yaml:"interval"` + } `json:"compaction" yaml:"compaction"` + } `json:"data_management" yaml:"data_management"` + Logging struct { + Level string `json:"level" yaml:"level"` + EnableQueryLog bool `json:"enable_query_log" yaml:"enable_query_log"` + EnableMetrics bool `json:"enable_metrics" yaml:"enable_metrics"` + } `json:"logging" yaml:"logging"` + Development struct { + DropCollectionOnStartup bool `json:"drop_collection_on_startup" yaml:"drop_collection_on_startup"` + AutoCreateCollection bool `json:"auto_create_collection" yaml:"auto_create_collection"` + VerboseErrors bool `json:"verbose_errors" yaml:"verbose_errors"` + } `json:"development" yaml:"development"` +} + type SemanticCache struct { // Type of cache backend to use BackendType string `yaml:"backend_type,omitempty"` @@ -477,7 +599,13 @@ type SemanticCache struct { // Eviction policy for in-memory cache ("fifo", "lru", "lfu") EvictionPolicy string `yaml:"eviction_policy,omitempty"` - // Path to backend-specific configuration file + // Redis configuration + Redis *RedisConfig `yaml:"redis,omitempty"` + + // Milvus configuration + Milvus *MilvusConfig `yaml:"milvus,omitempty"` + + // BackendConfigPath is a path to the backend-specific configuration file (Deprecated) BackendConfigPath string `yaml:"backend_config_path,omitempty"` // Embedding model to use for semantic similarity ("bert", "qwen3", "gemma") diff --git a/src/semantic-router/pkg/config/config_test.go b/src/semantic-router/pkg/config/config_test.go index 9694016e14..1f6518eeb0 100644 --- a/src/semantic-router/pkg/config/config_test.go +++ b/src/semantic-router/pkg/config/config_test.go @@ -1177,7 +1177,7 @@ semantic_cache: }) }) - Context("with milvus backend configuration", func() { + Context("(Deprecated) with file base milvus backend configuration", func() { BeforeEach(func() { configContent := ` semantic_cache: @@ -1191,7 +1191,7 @@ semantic_cache: Expect(err).NotTo(HaveOccurred()) }) - It("should parse milvus backend configuration correctly", func() { + It("should parse file base milvus backend configuration correctly", func() { cfg, err := Load(configFile) Expect(err).NotTo(HaveOccurred()) @@ -1206,6 +1206,267 @@ semantic_cache: }) }) + Context("with inline milvus backend configuration", func() { + BeforeEach(func() { + configContent := ` +semantic_cache: + enabled: true + backend_type: "milvus" + similarity_threshold: 0.9 + ttl_seconds: 7200 + milvus: + connection: + host: "localhost" + port: 12345 + database: "semantic_router_cache" + timeout: 99 + auth: + enabled: false + username: "1234" + password: "1234" + tls: + enabled: false + cert_file: "" + key_file: "" + ca_file: "" + collection: + name: "semantic_cache" + description: "test" + vector_field: + name: "test" + dimension: 384 + metric_type: "test" + index: + type: "HNSW" + params: + M: 16 + efConstruction: 64 + search: + params: + ef: 64 + topk: 10 + consistency_level: "Session" + performance: + connection_pool: + max_connections: 10 + max_idle_connections: 5 + acquire_timeout: 5 + batch: + insert_batch_size: 1000 + timeout: 30 + data_management: + ttl: + enabled: true + timestamp_field: "timestamp" + cleanup_interval: 3600 + compaction: + enabled: true + interval: 86400 + logging: + level: "info" + enable_query_log: false + enable_metrics: true + development: + drop_collection_on_startup: true + auto_create_collection: true + verbose_errors: true +` + err := os.WriteFile(configFile, []byte(configContent), 0o644) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should parse inline milvus backend connection configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus).ToNot(BeNil()) + Expect(cfg.SemanticCache.Milvus.Connection).ToNot(BeNil()) + Expect(cfg.SemanticCache.Milvus.Connection.Host).To(Equal("localhost")) + Expect(cfg.SemanticCache.Milvus.Connection.Port).To(Equal(12345)) + Expect(cfg.SemanticCache.Milvus.Connection.Database).To(Equal("semantic_router_cache")) + Expect(cfg.SemanticCache.Milvus.Connection.Timeout).To(Equal(99)) + Expect(cfg.SemanticCache.Milvus.Connection.Auth.Enabled).To(BeFalse()) + Expect(cfg.SemanticCache.Milvus.Connection.Auth.Username).To(Equal("1234")) + Expect(cfg.SemanticCache.Milvus.Connection.Auth.Password).To(Equal("1234")) + Expect(cfg.SemanticCache.Milvus.Connection.TLS.Enabled).To(BeFalse()) + Expect(cfg.SemanticCache.Milvus.Connection.TLS.CertFile).To(Equal("")) + Expect(cfg.SemanticCache.Milvus.Connection.TLS.KeyFile).To(Equal("")) + Expect(cfg.SemanticCache.Milvus.Connection.TLS.CAFile).To(Equal("")) + }) + + It("should parse inline milvus backend collection configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus.Collection).ToNot(BeNil()) + Expect(cfg.SemanticCache.Milvus.Collection.Name).To(Equal("semantic_cache")) + Expect(cfg.SemanticCache.Milvus.Collection.Description).To(Equal("test")) + Expect(cfg.SemanticCache.Milvus.Collection.VectorField.Name).To(Equal("test")) + Expect(cfg.SemanticCache.Milvus.Collection.VectorField.Dimension).To(Equal(384)) + Expect(cfg.SemanticCache.Milvus.Collection.VectorField.MetricType).To(Equal("test")) + Expect(cfg.SemanticCache.Milvus.Collection.Index.Type).To(Equal("HNSW")) + Expect(cfg.SemanticCache.Milvus.Collection.Index.Params.M).To(Equal(16)) + Expect(cfg.SemanticCache.Milvus.Collection.Index.Params.EfConstruction).To(Equal(64)) + }) + + It("should parse inline milvus backend search configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus.Search).ToNot(BeNil()) + Expect(cfg.SemanticCache.Milvus.Search.Params.Ef).To(Equal(64)) + Expect(cfg.SemanticCache.Milvus.Search.TopK).To(Equal(10)) + Expect(cfg.SemanticCache.Milvus.Search.ConsistencyLevel).To(Equal("Session")) + }) + + It("should parse inline milvus backend performance configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus.Performance).ToNot(BeNil()) + Expect(cfg.SemanticCache.Milvus.Performance.ConnectionPool.MaxConnections).To(Equal(10)) + Expect(cfg.SemanticCache.Milvus.Performance.ConnectionPool.MaxIdleConnections).To(Equal(5)) + Expect(cfg.SemanticCache.Milvus.Performance.ConnectionPool.AcquireTimeout).To(Equal(5)) + Expect(cfg.SemanticCache.Milvus.Performance.Batch.InsertBatchSize).To(Equal(1000)) + Expect(cfg.SemanticCache.Milvus.Performance.Batch.Timeout).To(Equal(30)) + }) + + It("should parse inline milvus backend data management configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus.DataManagement).ToNot(BeNil()) + Expect(cfg.SemanticCache.Milvus.DataManagement.TTL.Enabled).To(BeTrue()) + Expect(cfg.SemanticCache.Milvus.DataManagement.TTL.TimestampField).To(Equal("timestamp")) + Expect(cfg.SemanticCache.Milvus.DataManagement.TTL.CleanupInterval).To(Equal(3600)) + Expect(cfg.SemanticCache.Milvus.DataManagement.Compaction.Enabled).To(BeTrue()) + Expect(cfg.SemanticCache.Milvus.DataManagement.Compaction.Interval).To(Equal(86400)) + }) + + It("should parse inline milvus backend logging configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus.Logging.Level).To(Equal("info")) + Expect(cfg.SemanticCache.Milvus.Logging.EnableQueryLog).To(BeFalse()) + Expect(cfg.SemanticCache.Milvus.Logging.EnableMetrics).To(BeTrue()) + }) + + It("should parse inline milvus backend development configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Milvus.Development.DropCollectionOnStartup).To(BeTrue()) + Expect(cfg.SemanticCache.Milvus.Development.AutoCreateCollection).To(BeTrue()) + Expect(cfg.SemanticCache.Milvus.Development.VerboseErrors).To(BeTrue()) + }) + }) + + Context("with inline redis backend configuration", func() { + BeforeEach(func() { + configContent := ` +semantic_cache: + enabled: true + backend_type: "milvus" + similarity_threshold: 0.9 + ttl_seconds: 7200 + redis: + connection: + host: "localhost" + port: 6379 + database: 0 + password: "" + timeout: 30 + tls: + enabled: false + cert_file: "" + key_file: "" + ca_file: "" + index: + name: "semantic_cache_idx" + prefix: "doc:" + vector_field: + name: "embedding" + dimension: 384 + metric_type: "COSINE" + index_type: "HNSW" + params: + M: 16 + efConstruction: 64 + search: + topk: 1 + logging: + level: "info" + enable_query_log: false + enable_metrics: true + development: + drop_index_on_startup: true + auto_create_index: true + verbose_errors: true +` + err := os.WriteFile(configFile, []byte(configContent), 0o644) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should parse inline redis backend connection configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Redis).ToNot(BeNil()) + Expect(cfg.SemanticCache.Redis.Connection).ToNot(BeNil()) + Expect(cfg.SemanticCache.Redis.Connection.Host).To(Equal("localhost")) + Expect(cfg.SemanticCache.Redis.Connection.Port).To(Equal(6379)) + Expect(cfg.SemanticCache.Redis.Connection.Database).To(Equal(0)) + Expect(cfg.SemanticCache.Redis.Connection.Password).To(Equal("")) + Expect(cfg.SemanticCache.Redis.Connection.Timeout).To(Equal(30)) + Expect(cfg.SemanticCache.Redis.Connection.TLS.Enabled).To(BeFalse()) + Expect(cfg.SemanticCache.Redis.Connection.TLS.CertFile).To(Equal("")) + Expect(cfg.SemanticCache.Redis.Connection.TLS.KeyFile).To(Equal("")) + Expect(cfg.SemanticCache.Redis.Connection.TLS.CAFile).To(Equal("")) + }) + + It("should parse inline redis backend index configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Redis.Index).ToNot(BeNil()) + Expect(cfg.SemanticCache.Redis.Index.Name).To(Equal("semantic_cache_idx")) + Expect(cfg.SemanticCache.Redis.Index.Prefix).To(Equal("doc:")) + Expect(cfg.SemanticCache.Redis.Index.VectorField.Name).To(Equal("embedding")) + Expect(cfg.SemanticCache.Redis.Index.VectorField.Dimension).To(Equal(384)) + Expect(cfg.SemanticCache.Redis.Index.VectorField.MetricType).To(Equal("COSINE")) + Expect(cfg.SemanticCache.Redis.Index.IndexType).To(Equal("HNSW")) + Expect(cfg.SemanticCache.Redis.Index.Params.M).To(Equal(16)) + Expect(cfg.SemanticCache.Redis.Index.Params.EfConstruction).To(Equal(64)) + }) + + It("should parse inline redis backend search configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Redis.Search).ToNot(BeNil()) + Expect(cfg.SemanticCache.Redis.Search.TopK).To(Equal(1)) + }) + + It("should parse inline redis backend logging configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Redis.Logging.Level).To(Equal("info")) + Expect(cfg.SemanticCache.Redis.Logging.EnableQueryLog).To(BeFalse()) + Expect(cfg.SemanticCache.Redis.Logging.EnableMetrics).To(BeTrue()) + }) + + It("should parse inline redis backend development configuration correctly", func() { + cfg, err := Load(configFile) + Expect(err).NotTo(HaveOccurred()) + + Expect(cfg.SemanticCache.Redis.Development.DropIndexOnStartup).To(BeTrue()) + Expect(cfg.SemanticCache.Redis.Development.AutoCreateIndex).To(BeTrue()) + Expect(cfg.SemanticCache.Redis.Development.VerboseErrors).To(BeTrue()) + }) + }) + Context("with disabled cache", func() { BeforeEach(func() { configContent := ` diff --git a/src/semantic-router/pkg/extproc/router.go b/src/semantic-router/pkg/extproc/router.go index 84e9e51943..57e4358af9 100644 --- a/src/semantic-router/pkg/extproc/router.go +++ b/src/semantic-router/pkg/extproc/router.go @@ -113,6 +113,8 @@ func NewOpenAIRouter(configPath string) (*OpenAIRouter, error) { MaxEntries: cfg.SemanticCache.MaxEntries, TTLSeconds: cfg.SemanticCache.TTLSeconds, EvictionPolicy: cache.EvictionPolicyType(cfg.SemanticCache.EvictionPolicy), + Redis: cfg.SemanticCache.Redis, + Milvus: cfg.SemanticCache.Milvus, BackendConfigPath: cfg.SemanticCache.BackendConfigPath, EmbeddingModel: cfg.SemanticCache.EmbeddingModel, }