diff --git a/framework/go.mod b/framework/go.mod index 1fdb49d535..a51e79bdc5 100644 --- a/framework/go.mod +++ b/framework/go.mod @@ -12,6 +12,7 @@ require ( github.com/weaviate/weaviate v1.36.5 github.com/weaviate/weaviate-go-client/v5 v5.7.1 golang.org/x/crypto v0.49.0 + golang.org/x/sync v0.20.0 gorm.io/driver/sqlite v1.6.0 gorm.io/gorm v1.31.1 ) @@ -63,7 +64,6 @@ require ( go.opentelemetry.io/otel/trace v1.40.0 // indirect go.starlark.net v0.0.0-20260102030733-3fee463870c9 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect - golang.org/x/sync v0.20.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect ) diff --git a/framework/logstore/matviews.go b/framework/logstore/matviews.go new file mode 100644 index 0000000000..bf51748693 --- /dev/null +++ b/framework/logstore/matviews.go @@ -0,0 +1,869 @@ +package logstore + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/maximhq/bifrost/core/schemas" + "gorm.io/gorm" +) + +// --------------------------------------------------------------------------- +// Materialized view definitions +// --------------------------------------------------------------------------- + +// mvLogsHourlyDDL creates a materialized view that pre-aggregates logs into +// hourly buckets grouped by provider, model, status, object_type, and key IDs. +// Includes exact percentiles (p90/p95/p99) computed per hour so they can be +// re-aggregated via weighted averages across wider time ranges. +const mvLogsHourlyDDL = ` +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_hourly AS +SELECT + date_trunc('hour', timestamp) AS hour, + provider, + model, + status, + object_type, + selected_key_id, + COALESCE(virtual_key_id, '') AS virtual_key_id, + COALESCE(routing_rule_id, '') AS routing_rule_id, + COUNT(*) AS count, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS success_count, + SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) AS error_count, + COALESCE(AVG(latency), 0) AS avg_latency, + COALESCE(percentile_cont(0.90) WITHIN GROUP (ORDER BY latency), 0) AS p90_latency, + COALESCE(percentile_cont(0.95) WITHIN GROUP (ORDER BY latency), 0) AS p95_latency, + COALESCE(percentile_cont(0.99) WITHIN GROUP (ORDER BY latency), 0) AS p99_latency, + COALESCE(SUM(prompt_tokens), 0) AS total_prompt_tokens, + COALESCE(SUM(completion_tokens), 0) AS total_completion_tokens, + COALESCE(SUM(total_tokens), 0) AS total_tokens, + COALESCE(SUM(cached_read_tokens), 0) AS total_cached_read_tokens, + COALESCE(SUM(cost), 0) AS total_cost +FROM logs +WHERE status IN ('success', 'error') +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8 +` + +// mvLogsHourlyUniqueIdx is required for REFRESH MATERIALIZED VIEW CONCURRENTLY. +const mvLogsHourlyUniqueIdx = ` +CREATE UNIQUE INDEX IF NOT EXISTS mv_logs_hourly_uniq +ON mv_logs_hourly (hour, provider, model, status, object_type, selected_key_id, virtual_key_id, routing_rule_id) +` + +// mvLogsFilterdataDDL creates a materialized view of distinct filter values +// (models, providers, keys, routing rules, engines) from logs in the last 60 +// days. Used to populate filter dropdowns without scanning the raw table. +const mvLogsFilterdataDDL = ` +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_filterdata AS +SELECT DISTINCT + model, + provider, + selected_key_id, + selected_key_name, + COALESCE(virtual_key_id, '') AS virtual_key_id, + COALESCE(virtual_key_name, '') AS virtual_key_name, + COALESCE(routing_rule_id, '') AS routing_rule_id, + COALESCE(routing_rule_name, '') AS routing_rule_name, + COALESCE(routing_engines_used, '') AS routing_engines_used +FROM logs +WHERE timestamp >= NOW() - INTERVAL '60 days' + AND model IS NOT NULL AND model != '' +` + +// mvLogsFilterdataUniqueIdx is required for REFRESH MATERIALIZED VIEW CONCURRENTLY. +// Includes both ID and name columns so renamed keys don't cause duplicate violations. +const mvLogsFilterdataUniqueIdx = ` +CREATE UNIQUE INDEX IF NOT EXISTS mv_logs_filterdata_uniq +ON mv_logs_filterdata (model, provider, selected_key_id, selected_key_name, virtual_key_id, virtual_key_name, routing_rule_id, routing_rule_name, routing_engines_used) +` + +// --------------------------------------------------------------------------- +// View lifecycle +// --------------------------------------------------------------------------- + +// ensureMatViews creates materialized views and their unique indexes if they +// don't already exist. Called once on startup. +func ensureMatViews(ctx context.Context, db *gorm.DB) error { + for _, ddl := range []string{ + mvLogsHourlyDDL, + mvLogsHourlyUniqueIdx, + mvLogsFilterdataDDL, + mvLogsFilterdataUniqueIdx, + } { + if err := db.WithContext(ctx).Exec(ddl).Error; err != nil { + return fmt.Errorf("failed to create materialized view: %w", err) + } + } + return nil +} + +// refreshMatViews refreshes all materialized views concurrently (non-blocking +// for readers). Uses a PostgreSQL advisory try-lock so that in multi-replica +// deployments only one instance refreshes at a time — others skip silently. +func refreshMatViews(ctx context.Context, db *gorm.DB) error { + sqlDB, err := db.DB() + if err != nil { + return fmt.Errorf("failed to get sql.DB for matview refresh: %w", err) + } + + // Use a dedicated connection so lock/unlock/refresh all run on the same session. + conn, err := sqlDB.Conn(ctx) + if err != nil { + return fmt.Errorf("failed to get dedicated connection for matview refresh: %w", err) + } + defer conn.Close() + + // Try to acquire advisory lock; skip refresh if another replica holds it. + var acquired bool + if err := conn.QueryRowContext(ctx, "SELECT pg_try_advisory_lock($1)", matviewRefreshAdvisoryLockKey).Scan(&acquired); err != nil { + return fmt.Errorf("failed to try advisory lock for matview refresh: %w", err) + } + if !acquired { + return nil // another replica is refreshing + } + defer func() { + // Release lock explicitly; connection close would also release session-scoped locks. + _, _ = conn.ExecContext(ctx, "SELECT pg_advisory_unlock($1)", matviewRefreshAdvisoryLockKey) + }() + + for _, view := range []string{"mv_logs_hourly", "mv_logs_filterdata"} { + if _, err := conn.ExecContext(ctx, "REFRESH MATERIALIZED VIEW CONCURRENTLY "+view); err != nil { + return fmt.Errorf("failed to refresh %s: %w", view, err) + } + } + return nil +} + +// startMatViewRefresher launches a background goroutine that periodically +// refreshes materialized views. Returns a stop function for graceful shutdown. +func startMatViewRefresher(ctx context.Context, db *gorm.DB, interval time.Duration, logger schemas.Logger) func() { + stopCh := make(chan struct{}) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := refreshMatViews(ctx, db); err != nil { + logger.Warn(fmt.Sprintf("logstore: matview refresh failed: %s", err)) + } + case <-ctx.Done(): + return + case <-stopCh: + return + } + } + }() + return func() { close(stopCh) } +} + +// canUseMatView returns true if the given filters can be served from +// mv_logs_hourly. Per-row filters (content search, metadata, numeric ranges) +// require the raw logs table. +func canUseMatView(f SearchFilters) bool { + return f.ContentSearch == "" && + len(f.MetadataFilters) == 0 && + len(f.RoutingEngineUsed) == 0 && + f.MinLatency == nil && f.MaxLatency == nil && + f.MinTokens == nil && f.MaxTokens == nil && + f.MinCost == nil && f.MaxCost == nil && + !f.MissingCostOnly +} + +// --------------------------------------------------------------------------- +// Mat-view filter helpers +// --------------------------------------------------------------------------- + +// applyMatViewFilters builds WHERE clauses for queries against mv_logs_hourly. +func applyMatViewFilters(q *gorm.DB, f SearchFilters) *gorm.DB { + if f.StartTime != nil { + q = q.Where("hour >= date_trunc('hour', ?::timestamptz)", *f.StartTime) + } + if f.EndTime != nil { + q = q.Where("hour <= ?", *f.EndTime) + } + if len(f.Providers) > 0 { + q = q.Where("provider IN ?", f.Providers) + } + if len(f.Models) > 0 { + q = q.Where("model IN ?", f.Models) + } + if len(f.Status) > 0 { + q = q.Where("status IN ?", f.Status) + } + if len(f.Objects) > 0 { + q = q.Where("object_type IN ?", f.Objects) + } + if len(f.SelectedKeyIDs) > 0 { + q = q.Where("selected_key_id IN ?", f.SelectedKeyIDs) + } + if len(f.VirtualKeyIDs) > 0 { + q = q.Where("virtual_key_id IN ?", f.VirtualKeyIDs) + } + if len(f.RoutingRuleIDs) > 0 { + q = q.Where("routing_rule_id IN ?", f.RoutingRuleIDs) + } + return q +} + +// --------------------------------------------------------------------------- +// Mat-view query methods (called from rdb.go when dialect == "postgres") +// --------------------------------------------------------------------------- + +// getCountFromMatView returns the total number of logs matching the filters +// by summing pre-aggregated counts from mv_logs_hourly. +func (s *RDBLogStore) getCountFromMatView(ctx context.Context, filters SearchFilters) (int64, error) { + var total int64 + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select("COALESCE(SUM(count), 0)").Row().Scan(&total); err != nil { + return 0, err + } + return total, nil +} + +// getStatsFromMatView computes dashboard statistics (total requests, success +// rate, average latency, total tokens, total cost) from mv_logs_hourly. +// Latency is a weighted average across hourly buckets. +func (s *RDBLogStore) getStatsFromMatView(ctx context.Context, filters SearchFilters) (*SearchStats, error) { + var result struct { + TotalCount int64 `gorm:"column:total_count"` + SuccessCount int64 `gorm:"column:success_count"` + AvgLatency float64 `gorm:"column:avg_latency"` + TotalTokens int64 `gorm:"column:total_tokens"` + TotalCost float64 `gorm:"column:total_cost"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(` + COALESCE(SUM(count), 0) AS total_count, + COALESCE(SUM(success_count), 0) AS success_count, + CASE WHEN SUM(count) > 0 THEN SUM(avg_latency * count) / SUM(count) ELSE 0 END AS avg_latency, + COALESCE(SUM(total_tokens), 0) AS total_tokens, + COALESCE(SUM(total_cost), 0) AS total_cost + `).Scan(&result).Error; err != nil { + return nil, err + } + + var successRate float64 + if result.TotalCount > 0 { + successRate = float64(result.SuccessCount) / float64(result.TotalCount) * 100 + } + return &SearchStats{ + TotalRequests: result.TotalCount, + SuccessRate: successRate, + AverageLatency: result.AvgLatency, + TotalTokens: result.TotalTokens, + TotalCost: result.TotalCost, + }, nil +} + +// getHistogramFromMatView returns time-bucketed request counts (total, +// success, error) by re-aggregating hourly buckets from mv_logs_hourly. +func (s *RDBLogStore) getHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*HistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + Total int64 `gorm:"column:total"` + Success int64 `gorm:"column:success"` + ErrorCount int64 `gorm:"column:error_count"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + SUM(count) AS total, + SUM(success_count) AS success, + SUM(error_count) AS error_count + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + resultMap := make(map[int64]*struct{ total, success, errCount int64 }, len(results)) + for _, r := range results { + resultMap[r.BucketTimestamp] = &struct{ total, success, errCount int64 }{r.Total, r.Success, r.ErrorCount} + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]HistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := HistogramBucket{Timestamp: time.Unix(ts, 0).UTC()} + if a, ok := resultMap[ts]; ok { + b.Count = a.total + b.Success = a.success + b.Error = a.errCount + } + buckets = append(buckets, b) + } + return &HistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil +} + +// getTokenHistogramFromMatView returns time-bucketed token usage (prompt, +// completion, total, cached) from mv_logs_hourly. +func (s *RDBLogStore) getTokenHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*TokenHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + PromptTokens int64 `gorm:"column:prompt_tokens"` + CompletionTokens int64 `gorm:"column:completion_tokens"` + TotalTokens int64 `gorm:"column:total_tkns"` + CachedReadTokens int64 `gorm:"column:cached_read_tokens"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + SUM(total_prompt_tokens) AS prompt_tokens, + SUM(total_completion_tokens) AS completion_tokens, + SUM(total_tokens) AS total_tkns, + SUM(total_cached_read_tokens) AS cached_read_tokens + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + resultMap := make(map[int64]int, len(results)) + for i, r := range results { + resultMap[r.BucketTimestamp] = i + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]TokenHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := TokenHistogramBucket{Timestamp: time.Unix(ts, 0).UTC()} + if idx, ok := resultMap[ts]; ok { + r := results[idx] + b.PromptTokens = r.PromptTokens + b.CompletionTokens = r.CompletionTokens + b.TotalTokens = r.TotalTokens + b.CachedReadTokens = r.CachedReadTokens + } + buckets = append(buckets, b) + } + return &TokenHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil +} + +// getCostHistogramFromMatView returns time-bucketed cost data with per-model +// breakdown from mv_logs_hourly. +func (s *RDBLogStore) getCostHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*CostHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + Model string `gorm:"column:model"` + Cost float64 `gorm:"column:cost"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + model, + SUM(total_cost) AS cost + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp, model"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + type bucketAgg struct { + totalCost float64 + byModel map[string]float64 + } + grouped := make(map[int64]*bucketAgg) + modelsSet := make(map[string]struct{}) + for _, r := range results { + a, ok := grouped[r.BucketTimestamp] + if !ok { + a = &bucketAgg{byModel: make(map[string]float64)} + grouped[r.BucketTimestamp] = a + } + a.totalCost += r.Cost + a.byModel[r.Model] += r.Cost + modelsSet[r.Model] = struct{}{} + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]CostHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := CostHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByModel: make(map[string]float64)} + if a, ok := grouped[ts]; ok { + b.TotalCost = a.totalCost + b.ByModel = a.byModel + } + buckets = append(buckets, b) + } + + models := sortedStringKeys(modelsSet) + return &CostHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Models: models}, nil +} + +// getModelHistogramFromMatView returns time-bucketed model usage with +// success/error breakdown per model from mv_logs_hourly. +func (s *RDBLogStore) getModelHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ModelHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + Model string `gorm:"column:model"` + Total int64 `gorm:"column:total"` + Success int64 `gorm:"column:success"` + ErrorCount int64 `gorm:"column:error_count"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + model, + SUM(count) AS total, + SUM(success_count) AS success, + SUM(error_count) AS error_count + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp, model"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + type bucketAgg struct { + byModel map[string]ModelUsageStats + } + grouped := make(map[int64]*bucketAgg) + modelsSet := make(map[string]struct{}) + for _, r := range results { + a, ok := grouped[r.BucketTimestamp] + if !ok { + a = &bucketAgg{byModel: make(map[string]ModelUsageStats)} + grouped[r.BucketTimestamp] = a + } + existing := a.byModel[r.Model] + existing.Total += r.Total + existing.Success += r.Success + existing.Error += r.ErrorCount + a.byModel[r.Model] = existing + modelsSet[r.Model] = struct{}{} + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]ModelHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := ModelHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByModel: make(map[string]ModelUsageStats)} + if a, ok := grouped[ts]; ok { + b.ByModel = a.byModel + } + buckets = append(buckets, b) + } + + models := sortedStringKeys(modelsSet) + return &ModelHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Models: models}, nil +} + +// getLatencyHistogramFromMatView returns time-bucketed latency percentiles +// (avg, p90, p95, p99) from mv_logs_hourly. Percentiles are re-aggregated +// across hourly buckets using weighted averages (weighted by request count). +func (s *RDBLogStore) getLatencyHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + AvgLatency float64 `gorm:"column:avg_lat"` + P90Latency float64 `gorm:"column:p90_lat"` + P95Latency float64 `gorm:"column:p95_lat"` + P99Latency float64 `gorm:"column:p99_lat"` + TotalRequests int64 `gorm:"column:total_requests"` + } + // Weighted average of percentiles across hourly buckets + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + CASE WHEN SUM(count) > 0 THEN SUM(avg_latency * count) / SUM(count) ELSE 0 END AS avg_lat, + CASE WHEN SUM(count) > 0 THEN SUM(p90_latency * count) / SUM(count) ELSE 0 END AS p90_lat, + CASE WHEN SUM(count) > 0 THEN SUM(p95_latency * count) / SUM(count) ELSE 0 END AS p95_lat, + CASE WHEN SUM(count) > 0 THEN SUM(p99_latency * count) / SUM(count) ELSE 0 END AS p99_lat, + SUM(count) AS total_requests + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + resultMap := make(map[int64]int, len(results)) + for i, r := range results { + resultMap[r.BucketTimestamp] = i + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]LatencyHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := LatencyHistogramBucket{Timestamp: time.Unix(ts, 0).UTC()} + if idx, ok := resultMap[ts]; ok { + r := results[idx] + b.AvgLatency = r.AvgLatency + b.P90Latency = r.P90Latency + b.P95Latency = r.P95Latency + b.P99Latency = r.P99Latency + b.TotalRequests = r.TotalRequests + } + buckets = append(buckets, b) + } + return &LatencyHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds}, nil +} + +// getProviderCostHistogramFromMatView returns time-bucketed cost data with +// per-provider breakdown from mv_logs_hourly. +func (s *RDBLogStore) getProviderCostHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderCostHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + Provider string `gorm:"column:provider"` + Cost float64 `gorm:"column:cost"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + provider, + SUM(total_cost) AS cost + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp, provider"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + type bucketAgg struct { + totalCost float64 + byProvider map[string]float64 + } + grouped := make(map[int64]*bucketAgg) + providersSet := make(map[string]struct{}) + for _, r := range results { + a, ok := grouped[r.BucketTimestamp] + if !ok { + a = &bucketAgg{byProvider: make(map[string]float64)} + grouped[r.BucketTimestamp] = a + } + a.totalCost += r.Cost + a.byProvider[r.Provider] += r.Cost + providersSet[r.Provider] = struct{}{} + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]ProviderCostHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := ProviderCostHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByProvider: make(map[string]float64)} + if a, ok := grouped[ts]; ok { + b.TotalCost = a.totalCost + b.ByProvider = a.byProvider + } + buckets = append(buckets, b) + } + + providers := sortedStringKeys(providersSet) + return &ProviderCostHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Providers: providers}, nil +} + +// getProviderTokenHistogramFromMatView returns time-bucketed token usage with +// per-provider breakdown from mv_logs_hourly. +func (s *RDBLogStore) getProviderTokenHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderTokenHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + Provider string `gorm:"column:provider"` + PromptTokens int64 `gorm:"column:prompt_tokens"` + CompletionTokens int64 `gorm:"column:completion_tokens"` + TotalTokens int64 `gorm:"column:total_tkns"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + provider, + SUM(total_prompt_tokens) AS prompt_tokens, + SUM(total_completion_tokens) AS completion_tokens, + SUM(total_tokens) AS total_tkns, + SUM(total_cached_read_tokens) AS cached_read_tokens + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp, provider"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + type provAgg struct { + prompt, completion, total int64 + } + type bucketAgg struct { + byProvider map[string]*provAgg + } + grouped := make(map[int64]*bucketAgg) + providersSet := make(map[string]struct{}) + for _, r := range results { + a, ok := grouped[r.BucketTimestamp] + if !ok { + a = &bucketAgg{byProvider: make(map[string]*provAgg)} + grouped[r.BucketTimestamp] = a + } + pa, ok := a.byProvider[r.Provider] + if !ok { + pa = &provAgg{} + a.byProvider[r.Provider] = pa + } + pa.prompt += r.PromptTokens + pa.completion += r.CompletionTokens + pa.total += r.TotalTokens + providersSet[r.Provider] = struct{}{} + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]ProviderTokenHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := ProviderTokenHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByProvider: make(map[string]ProviderTokenStats)} + if a, ok := grouped[ts]; ok { + for prov, pa := range a.byProvider { + b.ByProvider[prov] = ProviderTokenStats{ + PromptTokens: pa.prompt, + CompletionTokens: pa.completion, + TotalTokens: pa.total, + } + } + } + buckets = append(buckets, b) + } + + providers := sortedStringKeys(providersSet) + return &ProviderTokenHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Providers: providers}, nil +} + +// getProviderLatencyHistogramFromMatView returns time-bucketed latency +// percentiles with per-provider breakdown from mv_logs_hourly. Percentiles +// are re-aggregated using weighted averages. +func (s *RDBLogStore) getProviderLatencyHistogramFromMatView(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) { + var results []struct { + BucketTimestamp int64 `gorm:"column:bucket_timestamp"` + Provider string `gorm:"column:provider"` + AvgLatency float64 `gorm:"column:avg_lat"` + P90Latency float64 `gorm:"column:p90_lat"` + P95Latency float64 `gorm:"column:p95_lat"` + P99Latency float64 `gorm:"column:p99_lat"` + TotalRequests int64 `gorm:"column:total_requests"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(fmt.Sprintf(` + CAST(FLOOR(EXTRACT(EPOCH FROM hour) / %d) * %d AS BIGINT) AS bucket_timestamp, + provider, + CASE WHEN SUM(count) > 0 THEN SUM(avg_latency * count) / SUM(count) ELSE 0 END AS avg_lat, + CASE WHEN SUM(count) > 0 THEN SUM(p90_latency * count) / SUM(count) ELSE 0 END AS p90_lat, + CASE WHEN SUM(count) > 0 THEN SUM(p95_latency * count) / SUM(count) ELSE 0 END AS p95_lat, + CASE WHEN SUM(count) > 0 THEN SUM(p99_latency * count) / SUM(count) ELSE 0 END AS p99_lat, + SUM(count) AS total_requests + `, bucketSizeSeconds, bucketSizeSeconds)). + Group("bucket_timestamp, provider"). + Order("bucket_timestamp ASC"). + Find(&results).Error; err != nil { + return nil, err + } + + type bucketAgg struct { + byProvider map[string]ProviderLatencyStats + } + grouped := make(map[int64]*bucketAgg) + providersSet := make(map[string]struct{}) + for _, r := range results { + a, ok := grouped[r.BucketTimestamp] + if !ok { + a = &bucketAgg{byProvider: make(map[string]ProviderLatencyStats)} + grouped[r.BucketTimestamp] = a + } + a.byProvider[r.Provider] = ProviderLatencyStats{ + AvgLatency: r.AvgLatency, + P90Latency: r.P90Latency, + P95Latency: r.P95Latency, + P99Latency: r.P99Latency, + TotalRequests: r.TotalRequests, + } + providersSet[r.Provider] = struct{}{} + } + + allTimestamps := generateBucketTimestamps(filters.StartTime, filters.EndTime, bucketSizeSeconds) + buckets := make([]ProviderLatencyHistogramBucket, 0, len(allTimestamps)) + for _, ts := range allTimestamps { + b := ProviderLatencyHistogramBucket{Timestamp: time.Unix(ts, 0).UTC(), ByProvider: make(map[string]ProviderLatencyStats)} + if a, ok := grouped[ts]; ok { + b.ByProvider = a.byProvider + } + buckets = append(buckets, b) + } + + providers := sortedStringKeys(providersSet) + return &ProviderLatencyHistogramResult{Buckets: buckets, BucketSizeSeconds: bucketSizeSeconds, Providers: providers}, nil +} + +// getModelRankingsFromMatView returns models ranked by usage with trend +// comparison to the previous period of equal duration from mv_logs_hourly. +func (s *RDBLogStore) getModelRankingsFromMatView(ctx context.Context, filters SearchFilters) (*ModelRankingResult, error) { + var results []struct { + Model string `gorm:"column:model"` + Provider string `gorm:"column:provider"` + Total int64 `gorm:"column:total"` + SuccessCount int64 `gorm:"column:success_count"` + AvgLatency float64 `gorm:"column:avg_lat"` + TotalTokens int64 `gorm:"column:total_tkns"` + TotalCost float64 `gorm:"column:total_cost"` + } + q := s.db.WithContext(ctx).Table("mv_logs_hourly") + q = applyMatViewFilters(q, filters) + if err := q.Select(` + model, provider, + SUM(count) AS total, + SUM(success_count) AS success_count, + CASE WHEN SUM(count) > 0 THEN SUM(avg_latency * count) / SUM(count) ELSE 0 END AS avg_lat, + SUM(total_tokens) AS total_tkns, + SUM(total_cost) AS total_cost + `).Group("model, provider"). + Order("total DESC"). + Find(&results).Error; err != nil { + return nil, err + } + + // Previous period for trend (same duration, ending just before current start) + type prevRow struct { + Model string `gorm:"column:model"` + Provider string `gorm:"column:provider"` + Total int64 `gorm:"column:total"` + AvgLatency float64 `gorm:"column:avg_lat"` + TotalTokens int64 `gorm:"column:total_tkns"` + TotalCost float64 `gorm:"column:total_cost"` + } + var prevResults []prevRow + if filters.StartTime != nil && filters.EndTime != nil { + duration := filters.EndTime.Sub(*filters.StartTime) + prevStart := filters.StartTime.Add(-duration) + prevEnd := filters.StartTime.Add(-time.Nanosecond) + prevFilters := filters + prevFilters.StartTime = &prevStart + prevFilters.EndTime = &prevEnd + pq := s.db.WithContext(ctx).Table("mv_logs_hourly") + pq = applyMatViewFilters(pq, prevFilters) + if err := pq.Select(` + model, provider, + SUM(count) AS total, + CASE WHEN SUM(count) > 0 THEN SUM(avg_latency * count) / SUM(count) ELSE 0 END AS avg_lat, + SUM(total_tokens) AS total_tkns, + SUM(total_cost) AS total_cost + `).Group("model, provider").Find(&prevResults).Error; err != nil { + return nil, fmt.Errorf("failed to get previous period rankings: %w", err) + } + } + // Key by model+provider to match current period granularity + type rankingKey struct{ model, provider string } + prevMap := make(map[rankingKey]int, len(prevResults)) + for i, r := range prevResults { + prevMap[rankingKey{r.Model, r.Provider}] = i + } + + rankings := make([]ModelRankingWithTrend, 0, len(results)) + for _, r := range results { + var successRate float64 + if r.Total > 0 { + successRate = float64(r.SuccessCount) / float64(r.Total) * 100 + } + entry := ModelRankingEntry{ + Model: r.Model, + Provider: r.Provider, + TotalRequests: r.Total, + SuccessCount: r.SuccessCount, + SuccessRate: successRate, + TotalTokens: r.TotalTokens, + TotalCost: r.TotalCost, + AvgLatency: r.AvgLatency, + } + mrt := ModelRankingWithTrend{ModelRankingEntry: entry} + if idx, ok := prevMap[rankingKey{r.Model, r.Provider}]; ok { + prev := prevResults[idx] + mrt.Trend = ModelRankingTrend{ + HasPreviousPeriod: true, + RequestsTrend: trendPct(float64(r.Total), float64(prev.Total)), + TokensTrend: trendPct(float64(r.TotalTokens), float64(prev.TotalTokens)), + CostTrend: trendPct(r.TotalCost, prev.TotalCost), + LatencyTrend: trendPct(r.AvgLatency, prev.AvgLatency), + } + } + rankings = append(rankings, mrt) + } + return &ModelRankingResult{Rankings: rankings}, nil +} + +// --------------------------------------------------------------------------- +// Filterdata from mat view +// --------------------------------------------------------------------------- + +// getDistinctModelsFromMatView returns unique model names from mv_logs_filterdata. +func (s *RDBLogStore) getDistinctModelsFromMatView(ctx context.Context) ([]string, error) { + var models []string + if err := s.db.WithContext(ctx).Table("mv_logs_filterdata"). + Distinct("model"). + Pluck("model", &models).Error; err != nil { + return nil, err + } + return models, nil +} + +// getDistinctKeyPairsFromMatView returns unique ID-Name pairs for the given +// columns (e.g. selected_key_id/name, virtual_key_id/name) from mv_logs_filterdata. +func (s *RDBLogStore) getDistinctKeyPairsFromMatView(ctx context.Context, idCol, nameCol string) ([]KeyPairResult, error) { + var results []KeyPairResult + if err := s.db.WithContext(ctx).Table("mv_logs_filterdata"). + Select(fmt.Sprintf("DISTINCT %s AS id, %s AS name", idCol, nameCol)). + Where(fmt.Sprintf("%s IS NOT NULL AND %s != ''", idCol, idCol)). + Find(&results).Error; err != nil { + return nil, err + } + return results, nil +} + +// getDistinctRoutingEnginesFromMatView returns unique routing engine names by +// parsing the comma-separated routing_engines_used values from mv_logs_filterdata. +func (s *RDBLogStore) getDistinctRoutingEnginesFromMatView(ctx context.Context) ([]string, error) { + var rawValues []string + if err := s.db.WithContext(ctx).Table("mv_logs_filterdata"). + Distinct("routing_engines_used"). + Where("routing_engines_used != ''"). + Pluck("routing_engines_used", &rawValues).Error; err != nil { + return nil, err + } + seen := make(map[string]struct{}) + for _, raw := range rawValues { + for _, eng := range strings.Split(raw, ",") { + eng = strings.TrimSpace(eng) + if eng != "" { + seen[eng] = struct{}{} + } + } + } + return sortedStringKeys(seen), nil +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +// sortedStringKeys returns the keys of a set map in sorted order. +func sortedStringKeys(m map[string]struct{}) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} + +// trendPct computes the percentage change from previous to current. +// Returns 0 when the previous value is zero (no basis for comparison). +func trendPct(current, previous float64) float64 { + if previous == 0 { + return 0 + } + return ((current - previous) / previous) * 100 +} diff --git a/framework/logstore/migrations.go b/framework/logstore/migrations.go index fc620f336f..9246a3bdeb 100644 --- a/framework/logstore/migrations.go +++ b/framework/logstore/migrations.go @@ -36,6 +36,10 @@ const ( // dashboardEnhancementsAdvisoryLockKey serializes the background dashboard // enhancements work (backfill + covering index rebuild) across cluster nodes. dashboardEnhancementsAdvisoryLockKey = 1000004 + + // matviewRefreshAdvisoryLockKey serializes periodic materialized view + // refreshes across cluster nodes so only one replica refreshes at a time. + matviewRefreshAdvisoryLockKey = 1000005 ) // advisoryLock holds a dedicated connection and the advisory lock key. @@ -1798,7 +1802,7 @@ func migrationAddMetadataGINIndex(ctx context.Context, db *gorm.DB) error { if err := tx.Exec("UPDATE logs SET metadata = NULL WHERE metadata IS NOT NULL AND metadata IS NOT JSON OBJECT").Error; err != nil { return fmt.Errorf("failed to clean invalid metadata values: %w", err) } - } else { + } else { // Go-based batch validation for PostgreSQL < 16. type metadataRow struct { ID string @@ -1836,7 +1840,7 @@ func migrationAddMetadataGINIndex(ctx context.Context, db *gorm.DB) error { break } } - } + } } return nil }, diff --git a/framework/logstore/postgres.go b/framework/logstore/postgres.go index 6dccaa05c1..688433876f 100644 --- a/framework/logstore/postgres.go +++ b/framework/logstore/postgres.go @@ -3,6 +3,7 @@ package logstore import ( "context" "fmt" + "time" "github.com/maximhq/bifrost/core/schemas" @@ -107,25 +108,35 @@ func newPostgresLogStore(ctx context.Context, config *PostgresConfig, logger sch logger.Info("logstore: metadata GIN index is ready") }() - // Ensure performance GIN indexes (trigram for content search, array for - // routing engines) exist and are valid. Same non-blocking pattern as above. + // Run dashboard enhancements first (backfill + covering index rebuild), + // then performance indexes second, in a single goroutine to avoid + // deadlocks from concurrent DDL on the same table. go func() { + if err := ensureDashboardEnhancements(context.Background(), db); err != nil { + logger.Warn(fmt.Sprintf("logstore: dashboard enhancements failed: %s (dashboard will still work with partial data)", err)) + } else { + logger.Info("logstore: dashboard enhancements completed") + } + if err := ensurePerformanceIndexes(context.Background(), db); err != nil { logger.Warn(fmt.Sprintf("logstore: performance index build failed: %s (queries will still work without the indexes)", err)) - return + } else { + logger.Info("logstore: performance indexes are ready") } - logger.Info("logstore: performance indexes are ready") }() - // Run the expensive dashboard enhancements (backfill cached_read_tokens, - // rebuild covering index, create MCP covering index) in a background - // goroutine so pod startup is not blocked on large tables. + // Create materialized views and start periodic refresh for dashboard queries. go func() { - if err := ensureDashboardEnhancements(context.Background(), db); err != nil { - logger.Warn(fmt.Sprintf("logstore: dashboard enhancements failed: %s (dashboard will still work with partial data)", err)) + if err := ensureMatViews(context.Background(), db); err != nil { + logger.Warn(fmt.Sprintf("logstore: matview creation failed: %s (dashboard queries will use raw tables)", err)) return } - logger.Info("logstore: dashboard enhancements completed") + if err := refreshMatViews(context.Background(), db); err != nil { + logger.Warn(fmt.Sprintf("logstore: initial matview refresh failed: %s", err)) + } else { + logger.Info("logstore: materialized views are ready") + } + startMatViewRefresher(context.Background(), db, 30*time.Second, logger) }() return d, nil diff --git a/framework/logstore/rdb.go b/framework/logstore/rdb.go index 412f5f731c..f77d5b4701 100644 --- a/framework/logstore/rdb.go +++ b/framework/logstore/rdb.go @@ -15,6 +15,7 @@ import ( "github.com/bytedance/sonic" "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/framework/configstore/tables" + "golang.org/x/sync/errgroup" "gorm.io/gorm" "gorm.io/gorm/clause" ) @@ -353,19 +354,7 @@ func (s *RDBLogStore) bulkUpdateCostPostgres(ctx context.Context, updates map[st // SearchLogs searches for logs in the database without calculating statistics. func (s *RDBLogStore) SearchLogs(ctx context.Context, filters SearchFilters, pagination PaginationOptions) (*SearchResult, error) { - var err error - baseQuery := s.db.WithContext(ctx).Model(&Log{}) - - // Apply filters efficiently - baseQuery = s.applyFilters(baseQuery, filters) - - // Get total count for pagination - var totalCount int64 - if err := baseQuery.Count(&totalCount).Error; err != nil { - return nil, err - } - - // Build order clause + // Build order clause up front (needed by the data goroutine). direction := "DESC" if pagination.Order == "asc" { direction = "ASC" @@ -385,39 +374,52 @@ func (s *RDBLogStore) SearchLogs(ctx context.Context, filters SearchFilters, pag orderClause = "timestamp " + direction } - // Execute main query with sorting and pagination. - // Use an explicit SELECT to omit large output/detail TEXT columns and to - // extract only the last element from input history JSON arrays via SQL — - // the table view only renders the last message, so the full conversation - // never needs to be loaded into Go memory. - var logs []Log - mainQuery := baseQuery.Order(orderClause).Select(s.listSelectColumns()) - limit := pagination.Limit if limit <= 0 || limit > defaultMaxSearchLimit { limit = defaultMaxSearchLimit } pagination.Limit = limit - mainQuery = mainQuery.Limit(limit) - if pagination.Offset > 0 { - mainQuery = mainQuery.Offset(pagination.Offset) - } - if err = mainQuery.Find(&logs).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return &SearchResult{ - Logs: logs, - Pagination: pagination, - Stats: SearchStats{ - TotalRequests: totalCount, - }, - }, nil + // Run COUNT and data fetch concurrently — the COUNT on large tables is the + // bottleneck, so overlapping it with the (fast) data query halves wall time. + // Each goroutine builds its own *gorm.DB because Count() mutates the session. + var totalCount int64 + var logs []Log + + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) { + var err error + totalCount, err = s.getCountFromMatView(gCtx, filters) + return err } + countQuery := s.db.WithContext(gCtx).Model(&Log{}) + countQuery = s.applyFilters(countQuery, filters) + return countQuery.Count(&totalCount).Error + }) + + g.Go(func() error { + dataQuery := s.db.WithContext(gCtx).Model(&Log{}) + dataQuery = s.applyFilters(dataQuery, filters) + dataQuery = dataQuery.Order(orderClause).Select(s.listSelectColumns()).Limit(limit) + if pagination.Offset > 0 { + dataQuery = dataQuery.Offset(pagination.Offset) + } + err := dataQuery.Find(&logs).Error + if err != nil && errors.Is(err, gorm.ErrRecordNotFound) { + return nil + } + return err + }) + + if err := g.Wait(); err != nil { return nil, err } hasLogs := len(logs) > 0 if !hasLogs { + var err error hasLogs, err = s.HasLogs(ctx) if err != nil { return nil, err @@ -475,6 +477,9 @@ func (s *RDBLogStore) listSelectColumns() string { // GetStats calculates statistics for logs matching the given filters. func (s *RDBLogStore) GetStats(ctx context.Context, filters SearchFilters) (*SearchStats, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) { + return s.getStatsFromMatView(ctx, filters) + } baseQuery := s.db.WithContext(ctx).Model(&Log{}) baseQuery = s.applyFilters(baseQuery, filters) @@ -532,6 +537,9 @@ func (s *RDBLogStore) GetStats(ctx context.Context, filters SearchFilters) (*Sea // GetHistogram returns time-bucketed request counts for the given filters. func (s *RDBLogStore) GetHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*HistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 // Default to 1 hour } @@ -655,6 +663,9 @@ func (s *RDBLogStore) GetHistogram(ctx context.Context, filters SearchFilters, b // GetTokenHistogram returns time-bucketed token usage for the given filters. func (s *RDBLogStore) GetTokenHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*TokenHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getTokenHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 // Default to 1 hour } @@ -778,6 +789,9 @@ func (s *RDBLogStore) GetTokenHistogram(ctx context.Context, filters SearchFilte // GetCostHistogram returns time-bucketed cost data with model breakdown for the given filters. func (s *RDBLogStore) GetCostHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*CostHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getCostHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 // Default to 1 hour } @@ -897,6 +911,9 @@ func (s *RDBLogStore) GetCostHistogram(ctx context.Context, filters SearchFilter // GetModelHistogram returns time-bucketed model usage with success/error breakdown for the given filters. func (s *RDBLogStore) GetModelHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ModelHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getModelHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 // Default to 1 hour } @@ -1049,6 +1066,9 @@ func computePercentile(sorted []float64, p float64) float64 { // PostgreSQL uses database-level percentile_cont aggregation (returns 1 row per bucket). // MySQL and SQLite fall back to Go-based percentile computation (loads individual latency values). func (s *RDBLogStore) GetLatencyHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*LatencyHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getLatencyHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 } @@ -1260,6 +1280,9 @@ func (s *RDBLogStore) buildLatencyHistogramResult(computedBuckets map[int64]Late // GetModelRankings returns models ranked by usage with trend comparison to the previous period. func (s *RDBLogStore) GetModelRankings(ctx context.Context, filters SearchFilters) (*ModelRankingResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) { + return s.getModelRankingsFromMatView(ctx, filters) + } selectClause := ` model, provider, @@ -1405,6 +1428,9 @@ func pctChange(old, new float64) float64 { // GetProviderCostHistogram returns time-bucketed cost data with provider breakdown for the given filters. func (s *RDBLogStore) GetProviderCostHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderCostHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getProviderCostHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 } @@ -1513,6 +1539,9 @@ func (s *RDBLogStore) GetProviderCostHistogram(ctx context.Context, filters Sear // GetProviderTokenHistogram returns time-bucketed token usage with provider breakdown for the given filters. func (s *RDBLogStore) GetProviderTokenHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderTokenHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getProviderTokenHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 } @@ -1637,6 +1666,9 @@ func (s *RDBLogStore) GetProviderTokenHistogram(ctx context.Context, filters Sea // PostgreSQL uses database-level percentile_cont aggregation. // MySQL and SQLite fall back to Go-based percentile computation. func (s *RDBLogStore) GetProviderLatencyHistogram(ctx context.Context, filters SearchFilters, bucketSizeSeconds int64) (*ProviderLatencyHistogramResult, error) { + if s.db.Dialector.Name() == "postgres" && canUseMatView(filters) && bucketSizeSeconds >= 3600 { + return s.getProviderLatencyHistogramFromMatView(ctx, filters, bucketSizeSeconds) + } if bucketSizeSeconds <= 0 { bucketSizeSeconds = 3600 } @@ -1953,6 +1985,9 @@ func (s *RDBLogStore) Flush(ctx context.Context, since time.Time) error { // GetDistinctModels returns all unique non-empty model values using SELECT DISTINCT. // Scoped to recent data to avoid full table scans. func (s *RDBLogStore) GetDistinctModels(ctx context.Context) ([]string, error) { + if s.db.Dialector.Name() == "postgres" { + return s.getDistinctModelsFromMatView(ctx) + } cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays) var models []string err := s.db.WithContext(ctx).Model(&Log{}). @@ -1978,6 +2013,9 @@ var allowedKeyPairColumns = map[string]struct{}{ // GetDistinctKeyPairs returns unique non-empty ID-Name pairs for the given columns using SELECT DISTINCT. // idCol and nameCol must be valid column names (e.g., "selected_key_id", "selected_key_name"). func (s *RDBLogStore) GetDistinctKeyPairs(ctx context.Context, idCol, nameCol string) ([]KeyPairResult, error) { + if s.db.Dialector.Name() == "postgres" { + return s.getDistinctKeyPairsFromMatView(ctx, idCol, nameCol) + } if _, ok := allowedKeyPairColumns[idCol]; !ok { return nil, fmt.Errorf("invalid id column: %s", idCol) } @@ -2000,6 +2038,9 @@ func (s *RDBLogStore) GetDistinctKeyPairs(ctx context.Context, idCol, nameCol st // GetDistinctRoutingEngines returns all unique routing engine values from the comma-separated column. // Scoped to recent data to avoid full table scans. func (s *RDBLogStore) GetDistinctRoutingEngines(ctx context.Context) ([]string, error) { + if s.db.Dialector.Name() == "postgres" { + return s.getDistinctRoutingEnginesFromMatView(ctx) + } cutoff := time.Now().UTC().AddDate(0, 0, -defaultFilterDataCutoffDays) var rawValues []string err := s.db.WithContext(ctx).Model(&Log{}). diff --git a/framework/logstore/sqlite.go b/framework/logstore/sqlite.go index 7497a42e5b..f413c12aaa 100644 --- a/framework/logstore/sqlite.go +++ b/framework/logstore/sqlite.go @@ -42,5 +42,6 @@ func newSqliteLogStore(ctx context.Context, config *SQLiteConfig, logger schemas if err := triggerMigrations(ctx, db); err != nil { return nil, err } + return s, nil } diff --git a/ui/app/workspace/dashboard/page.tsx b/ui/app/workspace/dashboard/page.tsx index 14bb8dd955..9d41927611 100644 --- a/ui/app/workspace/dashboard/page.tsx +++ b/ui/app/workspace/dashboard/page.tsx @@ -276,16 +276,13 @@ export default function DashboardPage() { const providerTokenProviders = useMemo(() => sanitizeSeriesLabels(providerTokenData?.providers), [providerTokenData?.providers]); const providerLatencyProviders = useMemo(() => sanitizeSeriesLabels(providerLatencyData?.providers), [providerLatencyData?.providers]); - // Fetch all Overview data + // Fetch Overview tab data (5 calls) const fetchOverviewData = useCallback(async () => { setLoadingHistogram(true); setLoadingTokens(true); setLoadingCost(true); setLoadingModels(true); setLoadingLatency(true); - setLoadingProviderCost(true); - setLoadingProviderTokens(true); - setLoadingProviderLatency(true); const fetchFilters = { filters }; @@ -295,18 +292,12 @@ export default function DashboardPage() { costResult, modelResult, latencyResult, - providerCostResult, - providerTokenResult, - providerLatencyResult, ] = await Promise.all([ triggerHistogram(fetchFilters, false), triggerTokens(fetchFilters, false), triggerCost(fetchFilters, false), triggerModels(fetchFilters, false), triggerLatency(fetchFilters, false), - triggerProviderCost(fetchFilters, false), - triggerProviderTokens(fetchFilters, false), - triggerProviderLatency(fetchFilters, false), ]); setHistogramData(histogramResult.data ?? null); @@ -319,6 +310,33 @@ export default function DashboardPage() { setLoadingModels(false); setLatencyData(latencyResult.data ?? null); setLoadingLatency(false); + }, [ + filters, + triggerHistogram, + triggerTokens, + triggerCost, + triggerModels, + triggerLatency, + ]); + + // Fetch Provider Usage tab data (3 calls) + const fetchProviderData = useCallback(async () => { + setLoadingProviderCost(true); + setLoadingProviderTokens(true); + setLoadingProviderLatency(true); + + const fetchFilters = { filters }; + + const [ + providerCostResult, + providerTokenResult, + providerLatencyResult, + ] = await Promise.all([ + triggerProviderCost(fetchFilters, false), + triggerProviderTokens(fetchFilters, false), + triggerProviderLatency(fetchFilters, false), + ]); + setProviderCostData(providerCostResult.data ?? null); setLoadingProviderCost(false); setProviderTokenData(providerTokenResult.data ?? null); @@ -327,11 +345,6 @@ export default function DashboardPage() { setLoadingProviderLatency(false); }, [ filters, - triggerHistogram, - triggerTokens, - triggerCost, - triggerModels, - triggerLatency, triggerProviderCost, triggerProviderTokens, triggerProviderLatency, @@ -339,7 +352,6 @@ export default function DashboardPage() { // Fetch MCP data const fetchMcpData = useCallback(async () => { - const gen = mcpFetchGenRef.current; setLoadingMcpHistogram(true); setLoadingMcpCost(true); setLoadingMcpTopTools(true); @@ -352,8 +364,6 @@ export default function DashboardPage() { triggerMcpTopTools(fetchFilters, false), ]); - if (gen !== mcpFetchGenRef.current) return; - setMcpHistogramData(mcpHistResult.data ?? null); setLoadingMcpHistogram(false); setMcpCostData(mcpCostResult.data ?? null); @@ -364,104 +374,117 @@ export default function DashboardPage() { // Fetch Rankings data const fetchRankingsData = useCallback(async () => { - const gen = rankingsFetchGenRef.current; setLoadingRankings(true); const result = await triggerRankings({ filters }, false); - if (gen !== rankingsFetchGenRef.current) return; setRankingsData(result.data ?? null); setLoadingRankings(false); }, [filters, triggerRankings]); - // Track whether MCP data has been fetched for the current filter set - const mcpDataFetchedRef = useRef(false); - const mcpDataLoadingRef = useRef(false); - const mcpFetchGenRef = useRef(0); + // --- Lazy-load refs: each tab fetches only once per filter change --- + const overviewFetchedRef = useRef(false); + const overviewLoadingRef = useRef(false); + const overviewGenRef = useRef(0); - // Reset MCP loaded-flag when MCP-specific filters change - useEffect(() => { - mcpDataFetchedRef.current = false; - mcpDataLoadingRef.current = false; - mcpFetchGenRef.current += 1; - }, [mcpFilters]); + const providerFetchedRef = useRef(false); + const providerLoadingRef = useRef(false); + const providerGenRef = useRef(0); - const ensureMcpDataLoaded = useCallback(async () => { - if (mcpDataFetchedRef.current || mcpDataLoadingRef.current) { - return; + const mcpFetchedRef = useRef(false); + const mcpLoadingRef = useRef(false); + const mcpGenRef = useRef(0); + + const rankingsFetchedRef = useRef(false); + const rankingsLoadingRef = useRef(false); + const rankingsGenRef = useRef(0); + + const ensureOverviewDataLoaded = useCallback(async () => { + if (overviewFetchedRef.current || overviewLoadingRef.current) return; + const gen = overviewGenRef.current; + overviewLoadingRef.current = true; + try { + await fetchOverviewData(); + if (gen === overviewGenRef.current) overviewFetchedRef.current = true; + } finally { + if (gen === overviewGenRef.current) overviewLoadingRef.current = false; } + }, [fetchOverviewData]); - const gen = mcpFetchGenRef.current; - mcpDataLoadingRef.current = true; + const ensureProviderDataLoaded = useCallback(async () => { + if (providerFetchedRef.current || providerLoadingRef.current) return; + const gen = providerGenRef.current; + providerLoadingRef.current = true; + try { + await fetchProviderData(); + if (gen === providerGenRef.current) providerFetchedRef.current = true; + } finally { + if (gen === providerGenRef.current) providerLoadingRef.current = false; + } + }, [fetchProviderData]); + + const ensureMcpDataLoaded = useCallback(async () => { + if (mcpFetchedRef.current || mcpLoadingRef.current) return; + const gen = mcpGenRef.current; + mcpLoadingRef.current = true; try { await fetchMcpData(); - if (gen === mcpFetchGenRef.current) { - mcpDataFetchedRef.current = true; - } + if (gen === mcpGenRef.current) mcpFetchedRef.current = true; } finally { - if (gen === mcpFetchGenRef.current) { - mcpDataLoadingRef.current = false; - } + if (gen === mcpGenRef.current) mcpLoadingRef.current = false; } }, [fetchMcpData]); - // Track whether Rankings data has been fetched for the current filter set - const rankingsDataFetchedRef = useRef(false); - const rankingsDataLoadingRef = useRef(false); - const rankingsFetchGenRef = useRef(0); - const ensureRankingsDataLoaded = useCallback(async () => { - if (rankingsDataFetchedRef.current || rankingsDataLoadingRef.current) { - return; - } - - const gen = rankingsFetchGenRef.current; - rankingsDataLoadingRef.current = true; + if (rankingsFetchedRef.current || rankingsLoadingRef.current) return; + const gen = rankingsGenRef.current; + rankingsLoadingRef.current = true; try { await fetchRankingsData(); - if (gen === rankingsFetchGenRef.current) { - rankingsDataFetchedRef.current = true; - } + if (gen === rankingsGenRef.current) rankingsFetchedRef.current = true; } finally { - if (gen === rankingsFetchGenRef.current) { - rankingsDataLoadingRef.current = false; - } + if (gen === rankingsGenRef.current) rankingsLoadingRef.current = false; } }, [fetchRankingsData]); - // Fetch overview data on mount and when filters change + // Reset all lazy-load flags when filters change (not on tab switch) useEffect(() => { - fetchOverviewData(); - rankingsDataFetchedRef.current = false; - rankingsDataLoadingRef.current = false; - rankingsFetchGenRef.current += 1; - }, [fetchOverviewData]); + overviewFetchedRef.current = false; + overviewLoadingRef.current = false; + overviewGenRef.current += 1; + providerFetchedRef.current = false; + providerLoadingRef.current = false; + providerGenRef.current += 1; + rankingsFetchedRef.current = false; + rankingsLoadingRef.current = false; + rankingsGenRef.current += 1; + }, [filters]); - // Warm MCP and Rankings data in the background so switching tabs feels instant useEffect(() => { - if (urlState.tab === "mcp" || urlState.tab === "rankings") { - return; - } + mcpFetchedRef.current = false; + mcpLoadingRef.current = false; + mcpGenRef.current += 1; + }, [mcpFilters]); + // Fetch current tab's data when filters change or tab switches + // The ensure* functions are no-ops if data is already loaded for the current filters + useEffect(() => { + const tab = urlState.tab || "overview"; + if (tab === "overview") void ensureOverviewDataLoaded(); + else if (tab === "provider-usage") void ensureProviderDataLoaded(); + else if (tab === "rankings") void ensureRankingsDataLoaded(); + else if (tab === "mcp") void ensureMcpDataLoaded(); + }, [urlState.tab, ensureOverviewDataLoaded, ensureProviderDataLoaded, ensureRankingsDataLoaded, ensureMcpDataLoaded]); + + // Warm other tabs in the background after 150ms + useEffect(() => { + const tab = urlState.tab || "overview"; const timeoutId = window.setTimeout(() => { - void ensureMcpDataLoaded(); - void ensureRankingsDataLoaded(); + if (tab !== "overview") void ensureOverviewDataLoaded(); + if (tab !== "provider-usage") void ensureProviderDataLoaded(); + if (tab !== "mcp") void ensureMcpDataLoaded(); + if (tab !== "rankings") void ensureRankingsDataLoaded(); }, 150); - return () => window.clearTimeout(timeoutId); - }, [urlState.tab, ensureMcpDataLoaded, ensureRankingsDataLoaded]); - - // Fetch MCP data immediately when tab switches to MCP - useEffect(() => { - if (urlState.tab === "mcp") { - void ensureMcpDataLoaded(); - } - }, [urlState.tab, ensureMcpDataLoaded]); - - // Fetch Rankings data immediately when tab switches to Rankings - useEffect(() => { - if (urlState.tab === "rankings") { - void ensureRankingsDataLoaded(); - } - }, [urlState.tab, ensureRankingsDataLoaded]); + }, [urlState.tab, ensureOverviewDataLoaded, ensureProviderDataLoaded, ensureMcpDataLoaded, ensureRankingsDataLoaded]); // Handle time period change const handlePeriodChange = useCallback( diff --git a/ui/components/filters/filterPopover.tsx b/ui/components/filters/filterPopover.tsx index 37352efff4..7355cc954d 100644 --- a/ui/components/filters/filterPopover.tsx +++ b/ui/components/filters/filterPopover.tsx @@ -31,20 +31,32 @@ export function FilterPopover({ filters, onFilterChange, onMetadataFilterChange, const availableRoutingEngines = filterData?.routing_engines || []; const availableMetadataKeys = filterData?.metadata_keys || {}; - // Create mappings from name to ID for keys, virtual keys, and routing rules - const selectedKeyNameToId = new Map(availableSelectedKeys.map((key) => [key.name, key.id])); - const virtualKeyNameToId = new Map(availableVirtualKeys.map((key) => [key.name, key.id])); - const routingRuleNameToId = new Map(availableRoutingRules.map((rule) => [rule.name, rule.id])); + // Create mappings from name to ALL matching IDs (handles duplicate names from deleted keys) + const groupByName = (items: { name: string; id: string }[]) => { + const map = new Map(); + for (const item of items) { + const ids = map.get(item.name) || []; + ids.push(item.id); + map.set(item.name, ids); + } + return map; + }; + const selectedKeyNameToIds = groupByName(availableSelectedKeys); + const virtualKeyNameToIds = groupByName(availableVirtualKeys); + const routingRuleNameToIds = groupByName(availableRoutingRules); + + // Deduplicate by name to avoid React key collisions (e.g. multiple deleted keys with the same name) + const dedup = (items: { name: string }[]) => [...new Map(items.map((i) => [i.name, i])).values()].map((i) => i.name); const FILTER_OPTIONS: Record = { Status: [...Statuses], Providers: providersLoading ? [] : availableProviders.map((provider) => provider.name), Type: [...RequestTypes], Models: filterDataLoading ? [] : availableModels, - "Selected Keys": filterDataLoading ? [] : availableSelectedKeys.map((key) => key.name), - "Virtual Keys": filterDataLoading ? [] : availableVirtualKeys.map((key) => key.name), + "Selected Keys": filterDataLoading ? [] : dedup(availableSelectedKeys), + "Virtual Keys": filterDataLoading ? [] : dedup(availableVirtualKeys), "Routing Engines": filterDataLoading ? [] : availableRoutingEngines, - "Routing Rules": filterDataLoading ? [] : availableRoutingRules.map((rule) => rule.name), + "Routing Rules": filterDataLoading ? [] : dedup(availableRoutingRules), }; // Add dynamic metadata categories @@ -67,11 +79,12 @@ export function FilterPopover({ filters, onFilterChange, onMetadataFilterChange, "Routing Engines": "routing_engine_used", }; - const resolveValueForCategory = (category: string, value: string): string => { - if (category === "Selected Keys") return selectedKeyNameToId.get(value) || value; - if (category === "Virtual Keys") return virtualKeyNameToId.get(value) || value; - if (category === "Routing Rules") return routingRuleNameToId.get(value) || value; - return value; + // Resolves a display name to all matching IDs for key/rule categories + const resolveValuesForCategory = (category: string, value: string): string[] => { + if (category === "Selected Keys") return selectedKeyNameToIds.get(value) || [value]; + if (category === "Virtual Keys") return virtualKeyNameToIds.get(value) || [value]; + if (category === "Routing Rules") return routingRuleNameToIds.get(value) || [value]; + return [value]; }; const handleFilterSelect = (category: string, value: string) => { @@ -105,12 +118,14 @@ export function FilterPopover({ filters, onFilterChange, onMetadataFilterChange, } const filterKey = filterKeyMap[category]; - const resolved = resolveValueForCategory(category, value); + const resolvedIds = resolveValuesForCategory(category, value); const currentValues = (filters[filterKey] as string[]) || []; - const newValues = currentValues.includes(resolved) - ? currentValues.filter((v) => v !== resolved) - : [...currentValues, resolved]; + // Check if ALL resolved IDs are already selected (toggle all together) + const allSelected = resolvedIds.every((id) => currentValues.includes(id)); + const newValues = allSelected + ? currentValues.filter((v) => !resolvedIds.includes(v)) + : [...currentValues, ...resolvedIds.filter((id) => !currentValues.includes(id))]; onFilterChange(filterKey, newValues); }; @@ -124,9 +139,26 @@ export function FilterPopover({ filters, onFilterChange, onMetadataFilterChange, const filterKey = filterKeyMap[category]; const currentValues = filters[filterKey]; - const resolved = resolveValueForCategory(category, value); + const resolvedIds = resolveValuesForCategory(category, value); + + return Array.isArray(currentValues) && resolvedIds.every((id) => currentValues.includes(id)); + }; - return Array.isArray(currentValues) && currentValues.includes(resolved); + // Count unique visible names for ID-based categories (avoids inflated badge when + // multiple backing IDs share the same display name due to deduplication). + const countUniqueNames = (ids: string[], nameToIds: Map): number => { + const seen = new Set(); + for (const [name, mappedIds] of nameToIds) { + if (mappedIds.some((id) => ids.includes(id))) { + seen.add(name); + } + } + return seen.size; + }; + const dedupedCountKeys: Record> = { + selected_key_ids: selectedKeyNameToIds, + virtual_key_ids: virtualKeyNameToIds, + routing_rule_ids: routingRuleNameToIds, }; const excludedKeys = ["start_time", "end_time", "content_search", "metadata_filters"]; @@ -135,7 +167,8 @@ export function FilterPopover({ filters, onFilterChange, onMetadataFilterChange, return count; } if (Array.isArray(value)) { - return count + value.length; + const nameMap = dedupedCountKeys[key]; + return count + (nameMap ? countUniqueNames(value, nameMap) : value.length); } return count + (value ? 1 : 0); }, 0) + (filters.metadata_filters ? Object.keys(filters.metadata_filters).length : 0);