Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/creack/pty v1.1.24
github.com/maximhq/vt10x v0.0.0-20260312213827-20648b37d999
github.com/zalando/go-keyring v0.2.6
golang.org/x/term v0.40.0
golang.org/x/term v0.41.0
)

require (
Expand Down Expand Up @@ -47,6 +47,6 @@ require (
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
golang.org/x/arch v0.23.0 // indirect
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.33.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
)
12 changes: 6 additions & 6 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 h1:zfMcR1Cs4KNuomFFgGefv5N0c
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.40.0 h1:36e4zGLqU4yhjlmxEaagx2KuYbJq3EwY8K943ZsHcvg=
golang.org/x/term v0.40.0/go.mod h1:w2P8uVp06p2iyKKuvXIm7N/y0UCRt3UfJTfZ7oOpglM=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU=
golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
8 changes: 4 additions & 4 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ require (
github.com/valyala/fasthttp v1.68.0
go.starlark.net v0.0.0-20260102030733-3fee463870c9
golang.org/x/oauth2 v0.35.0
golang.org/x/text v0.33.0
golang.org/x/text v0.35.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -72,8 +72,8 @@ require (
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
golang.org/x/arch v0.23.0 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/net v0.49.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/crypto v0.49.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)
16 changes: 8 additions & 8 deletions core/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,21 @@ go.starlark.net v0.0.0-20260102030733-3fee463870c9 h1:nV1OyvU+0CYrp5eKfQ3rD03TpF
go.starlark.net v0.0.0-20260102030733-3fee463870c9/go.mod h1:YKMCv9b1WrfWmeqdV5MAuEHWsu5iC+fe6kYl2sQjdI8=
golang.org/x/arch v0.23.0 h1:lKF64A2jF6Zd8L0knGltUnegD62JMFBiCPBmQpToHhg=
golang.org/x/arch v0.23.0/go.mod h1:dNHoOeKiyja7GTvF9NJS1l3Z2yntpQNzgrjh1cU103A=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o=
golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8=
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ=
golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sys v0.0.0-20220712014510-0a85c31ab51e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k=
golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 1 addition & 1 deletion framework/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.41.5 // indirect
github.com/aws/smithy-go v1.24.0 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/buger/jsonparser v1.1.2 // indirect
github.com/bytedance/sonic v1.15.0
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.6 // indirect
Expand Down
4 changes: 2 additions & 2 deletions framework/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/buger/jsonparser v1.1.2 h1:frqHqw7otoVbk5M8LlE/L7HTnIq2v9RX6EJ48i9AxJk=
github.com/buger/jsonparser v1.1.2/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
github.com/bytedance/sonic v1.15.0 h1:/PXeWFaR5ElNcVE84U0dOHjiMHQOwNIx3K4ymzh/uSE=
Expand Down
218 changes: 101 additions & 117 deletions framework/logstore/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ const (
// perfIndexAdvisoryLockKey serializes the background performance index build
// (trigram + routing engine GIN indexes) across cluster nodes.
perfIndexAdvisoryLockKey = 1000003

// dashboardEnhancementsAdvisoryLockKey serializes the background dashboard
// enhancements work (backfill + covering index rebuild) across cluster nodes.
dashboardEnhancementsAdvisoryLockKey = 1000004
)

// advisoryLock holds a dedicated connection and the advisory lock key.
Expand Down Expand Up @@ -95,6 +99,12 @@ func acquirePerfIndexLock(ctx context.Context, db *gorm.DB) (*advisoryLock, erro
return acquireAdvisoryLock(ctx, db, perfIndexAdvisoryLockKey, "perf_index")
}

// acquireDashboardEnhancementsLock acquires the serialization lock for the background
// dashboard enhancements work (backfill + covering index rebuild).
func acquireDashboardEnhancementsLock(ctx context.Context, db *gorm.DB) (*advisoryLock, error) {
return acquireAdvisoryLock(ctx, db, dashboardEnhancementsAdvisoryLockKey, "dashboard_enhancements")
}

// triggerMigrations runs all registered logstore schema migrations in order under a
// PostgreSQL advisory lock (shared with configstore) so only one node migrates at a time.
func triggerMigrations(ctx context.Context, db *gorm.DB) error {
Expand Down Expand Up @@ -1913,9 +1923,10 @@ func ensureMetadataGINIndex(ctx context.Context, db *gorm.DB) error {
return nil
}

// migrationAddDashboardEnhancements adds cached_read_tokens column to logs table,
// updates the histogram covering index to include it, and adds MCP histogram covering index.
// All in a single migration to keep schema changes atomic.
// migrationAddDashboardEnhancements adds cached_read_tokens column to logs table.
// The expensive backfill, covering index rebuild, and MCP index creation are deferred
// to ensureDashboardEnhancements (called post-startup in a background goroutine) so
// they do not block pod startup on large tables.
func migrationAddDashboardEnhancements(ctx context.Context, db *gorm.DB) error {
opts := *migrator.DefaultOptions
opts.UseTransaction = false
Expand All @@ -1924,135 +1935,21 @@ func migrationAddDashboardEnhancements(ctx context.Context, db *gorm.DB) error {
Migrate: func(tx *gorm.DB) error {
tx = tx.WithContext(ctx)
dbMigrator := tx.Migrator()
dialect := tx.Dialector.Name()

// Step 1: Add cached_read_tokens column to logs
if !dbMigrator.HasColumn(&Log{}, "cached_read_tokens") {
if err := dbMigrator.AddColumn(&Log{}, "CachedReadTokens"); err != nil {
return fmt.Errorf("failed to add cached_read_tokens column: %w", err)
}
}

// Step 2: Backfill cached_read_tokens from token_usage JSON
var backfillSQL string
switch dialect {
case "sqlite":
backfillSQL = `UPDATE logs SET
cached_read_tokens = COALESCE(json_extract(token_usage, '$.prompt_tokens_details.cached_read_tokens'), 0)
WHERE token_usage IS NOT NULL AND token_usage != '' AND token_usage != 'null' AND json_valid(token_usage)`
case "mysql":
backfillSQL = `UPDATE logs SET
cached_read_tokens = COALESCE(CAST(JSON_UNQUOTE(JSON_EXTRACT(token_usage, '$.prompt_tokens_details.cached_read_tokens')) AS SIGNED), 0)
WHERE token_usage IS NOT NULL AND token_usage != '' AND token_usage != 'null' AND JSON_VALID(token_usage)`
default: // postgres
backfillSQL = `UPDATE logs SET
cached_read_tokens = COALESCE((token_usage::jsonb->'prompt_tokens_details'->>'cached_read_tokens')::int, 0)
WHERE token_usage IS NOT NULL AND token_usage != '' AND token_usage != 'null'
AND token_usage ~ '^\s*\{.*\}\s*$'`
}
if err := tx.Exec(backfillSQL).Error; err != nil {
return fmt.Errorf("failed to backfill cached_read_tokens: %w", err)
}

// Step 3: Drop and recreate covering index with cached_read_tokens
if dbMigrator.HasIndex(&Log{}, "idx_logs_histogram_cover") {
switch dialect {
case "mysql":
if err := tx.Exec("DROP INDEX idx_logs_histogram_cover ON logs").Error; err != nil {
return fmt.Errorf("failed to drop old covering index: %w", err)
}
default:
if err := tx.Exec("DROP INDEX IF EXISTS idx_logs_histogram_cover").Error; err != nil {
return fmt.Errorf("failed to drop old covering index: %w", err)
}
}
}

var createLogsIndexSQL string
switch dialect {
case "mysql":
createLogsIndexSQL = `CREATE INDEX idx_logs_histogram_cover ON logs(
status(50), timestamp,
selected_key_id(50), virtual_key_id(50), routing_rule_id(50), provider(50), object_type(50),
model(50), cost, prompt_tokens, completion_tokens, total_tokens, cached_read_tokens
)`
default:
createLogsIndexSQL = `CREATE INDEX IF NOT EXISTS idx_logs_histogram_cover ON logs(
status, timestamp,
selected_key_id, virtual_key_id, routing_rule_id, provider, object_type,
model, cost, prompt_tokens, completion_tokens, total_tokens, cached_read_tokens
)`
}
if err := tx.Exec(createLogsIndexSQL).Error; err != nil {
return fmt.Errorf("failed to create updated covering index: %w", err)
}

// Step 4: Add MCP histogram covering index
if !dbMigrator.HasIndex(&MCPToolLog{}, "idx_mcp_logs_histogram_cover") {
var createMCPIndexSQL string
switch dialect {
case "mysql":
createMCPIndexSQL = `CREATE INDEX idx_mcp_logs_histogram_cover ON mcp_tool_logs(
status(50), timestamp, tool_name(50), server_label(50), virtual_key_id(50), cost
)`
default:
createMCPIndexSQL = `CREATE INDEX IF NOT EXISTS idx_mcp_logs_histogram_cover ON mcp_tool_logs(
status, timestamp, tool_name, server_label, virtual_key_id, cost
)`
}
if err := tx.Exec(createMCPIndexSQL).Error; err != nil {
return fmt.Errorf("failed to create MCP histogram covering index: %w", err)
}
}

return nil
},
Rollback: func(tx *gorm.DB) error {
tx = tx.WithContext(ctx)
dbMigrator := tx.Migrator()
dialect := tx.Dialector.Name()

// Drop MCP covering index
if dbMigrator.HasIndex(&MCPToolLog{}, "idx_mcp_logs_histogram_cover") {
switch dialect {
case "mysql":
_ = tx.Exec("DROP INDEX idx_mcp_logs_histogram_cover ON mcp_tool_logs")
default:
_ = tx.Exec("DROP INDEX IF EXISTS idx_mcp_logs_histogram_cover")
}
}

// Revert covering index to original (without cached_read_tokens)
if dbMigrator.HasIndex(&Log{}, "idx_logs_histogram_cover") {
switch dialect {
case "mysql":
_ = tx.Exec("DROP INDEX idx_logs_histogram_cover ON logs")
default:
_ = tx.Exec("DROP INDEX IF EXISTS idx_logs_histogram_cover")
}
}
var revertSQL string
switch dialect {
case "mysql":
revertSQL = `CREATE INDEX idx_logs_histogram_cover ON logs(
status(50), timestamp,
selected_key_id(50), virtual_key_id(50), routing_rule_id(50), provider(50), object_type(50),
model(50), cost, prompt_tokens, completion_tokens, total_tokens
)`
default:
revertSQL = `CREATE INDEX IF NOT EXISTS idx_logs_histogram_cover ON logs(
status, timestamp,
selected_key_id, virtual_key_id, routing_rule_id, provider, object_type,
model, cost, prompt_tokens, completion_tokens, total_tokens
)`
}
_ = tx.Exec(revertSQL)

// Drop cached_read_tokens column
if dbMigrator.HasColumn(&Log{}, "cached_read_tokens") {
_ = dbMigrator.DropColumn(&Log{}, "cached_read_tokens")
}

return nil
},
}})
Expand All @@ -2062,6 +1959,93 @@ func migrationAddDashboardEnhancements(ctx context.Context, db *gorm.DB) error {
return nil
}

// ensureDashboardEnhancements performs the expensive dashboard migration work that was
// deferred from migrationAddDashboardEnhancements: backfilling cached_read_tokens from
// the token_usage JSON, rebuilding the histogram covering index to include the new column,
// and creating the MCP histogram covering index.
//
// This is intentionally separate so that the long-running UPDATE and index rebuild do not
// block pod startup. Callers that want non-blocking behaviour should invoke this in a
// goroutine (see postgres.go). All operations are idempotent and safe to re-run.
func ensureDashboardEnhancements(ctx context.Context, db *gorm.DB) error {
if db.Dialector.Name() != "postgres" {
return nil
}

lock, err := acquireDashboardEnhancementsLock(ctx, db)
if err != nil {
return err
}
defer lock.release(context.Background())

// Backfill cached_read_tokens from token_usage JSON.
// The extra `AND cached_read_tokens = 0` plus `AND COALESCE(...) > 0` makes
// re-runs cheap: rows already backfilled have non-zero values (skipped),
// and rows with genuinely zero cached tokens are also skipped (correct as-is).
backfillSQL := `UPDATE logs SET
cached_read_tokens = (token_usage::jsonb->'prompt_tokens_details'->>'cached_read_tokens')::int
WHERE cached_read_tokens = 0
AND token_usage IS NOT NULL AND token_usage != '' AND token_usage != 'null'
AND token_usage ~ '^\s*\{.*\}\s*$'
AND COALESCE((token_usage::jsonb->'prompt_tokens_details'->>'cached_read_tokens')::int, 0) > 0`
if err := db.WithContext(ctx).Exec(backfillSQL).Error; err != nil {
return fmt.Errorf("failed to backfill cached_read_tokens: %w", err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Rebuild histogram covering index with cached_read_tokens included,
// but only if missing or invalid (skip if already healthy).
var logsIndexValid bool
if err := db.WithContext(ctx).Raw(`
SELECT COALESCE(bool_and(pi.indisvalid), false)
FROM pg_class pc
JOIN pg_index pi ON pi.indrelid = pc.oid
JOIN pg_class ic ON ic.oid = pi.indexrelid
WHERE pc.relname = 'logs'
AND ic.relname = 'idx_logs_histogram_cover'
`).Scan(&logsIndexValid).Error; err != nil {
return fmt.Errorf("failed to check logs histogram index validity: %w", err)
}
if !logsIndexValid {
if err := db.WithContext(ctx).Exec("DROP INDEX CONCURRENTLY IF EXISTS idx_logs_histogram_cover").Error; err != nil {
return fmt.Errorf("failed to drop old covering index: %w", err)
}
createLogsIndexSQL := `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_logs_histogram_cover ON logs(
status, timestamp,
selected_key_id, virtual_key_id, routing_rule_id, provider, object_type,
model, cost, prompt_tokens, completion_tokens, total_tokens, cached_read_tokens
)`
if err := db.WithContext(ctx).Exec(createLogsIndexSQL).Error; err != nil {
return fmt.Errorf("failed to create updated covering index: %w", err)
}
}

// Create MCP histogram covering index if missing or invalid.
var mcpIndexValid bool
if err := db.WithContext(ctx).Raw(`
SELECT COALESCE(bool_and(pi.indisvalid), false)
FROM pg_class pc
JOIN pg_index pi ON pi.indrelid = pc.oid
JOIN pg_class ic ON ic.oid = pi.indexrelid
WHERE pc.relname = 'mcp_tool_logs'
AND ic.relname = 'idx_mcp_logs_histogram_cover'
`).Scan(&mcpIndexValid).Error; err != nil {
return fmt.Errorf("failed to check MCP histogram index validity: %w", err)
}
if !mcpIndexValid {
if err := db.WithContext(ctx).Exec("DROP INDEX CONCURRENTLY IF EXISTS idx_mcp_logs_histogram_cover").Error; err != nil {
return fmt.Errorf("failed to drop invalid MCP histogram index: %w", err)
}
createMCPIndexSQL := `CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_mcp_logs_histogram_cover ON mcp_tool_logs(
status, timestamp, tool_name, server_label, virtual_key_id, cost
)`
if err := db.WithContext(ctx).Exec(createMCPIndexSQL).Error; err != nil {
return fmt.Errorf("failed to create MCP histogram covering index: %w", err)
}
}

return nil
}

// migrationAddPerformanceGINIndexes records the migration version for the performance
// indexes. Actual index creation is deferred to ensurePerformanceIndexes (called
// post-startup in a background goroutine) because CREATE INDEX CONCURRENTLY cannot
Expand Down
11 changes: 11 additions & 0 deletions framework/logstore/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,16 @@ func newPostgresLogStore(ctx context.Context, config *PostgresConfig, logger sch
logger.Info("logstore: performance indexes are ready")
}()

// Run the expensive dashboard enhancements (backfill cached_read_tokens,
// rebuild covering index, create MCP covering index) in a background
// goroutine so pod startup is not blocked on large tables.
go func() {
if err := ensureDashboardEnhancements(context.Background(), db); err != nil {
logger.Warn(fmt.Sprintf("logstore: dashboard enhancements failed: %s (dashboard will still work with partial data)", err))
return
}
logger.Info("logstore: dashboard enhancements completed")
}()

return d, nil
}
Loading
Loading