Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
be8d9bb
feat: 新增代理池并拆分请求超时配置
xunxun1982 Jun 8, 2026
ce70b17
fix: 优化代理池批量测试与代理 URL 校验
xunxun1982 Jun 8, 2026
71e3d0d
fix: 完善代理池审核问题修复
xunxun1982 Jun 8, 2026
ce091c2
fix: 加固旧代理迁移并发安全
xunxun1982 Jun 8, 2026
b189f2d
fix: 修复代理池错误分类与流式超时边界
xunxun1982 Jun 8, 2026
ccb4a3c
fix: 完善代理池错误处理与超时边界
xunxun1982 Jun 8, 2026
1d6b423
fix: 精简代理 URL 解析并补充超时兼容说明
xunxun1982 Jun 9, 2026
3b7faae
fix: 完善代理池检测配置与端口校验
xunxun1982 Jun 9, 2026
e485748
fix: 统一代理池检测超时边界
xunxun1982 Jun 9, 2026
5719d2f
fix: 完善代理池检测配置与超时兼容
xunxun1982 Jun 9, 2026
40bdb9b
fix: 收尾代理池分页与测试稳定性
xunxun1982 Jun 9, 2026
6872248
fix: 完善代理池审核修复与配置测试
xunxun1982 Jun 9, 2026
e39d8b9
fix: 修复代理池删除后的本地状态回滚
xunxun1982 Jun 9, 2026
f83bb72
fix: 修复代理池迁移日志与分页选择器
xunxun1982 Jun 9, 2026
ebb8763
fix: 串行化代理池名称索引迁移并修正删除回滚
xunxun1982 Jun 9, 2026
177e1e6
fix: 修复代理池审核问题并优化聚合健康度衰减
xunxun1982 Jun 10, 2026
64a71a5
fix: 优化聚合动态权重健康衰减与子分组默认权重
xunxun1982 Jun 10, 2026
cf5e7ae
fix: 简化临界健康度恢复权重逻辑
xunxun1982 Jun 10, 2026
ec1415c
docs: 补充 split timeout 归一化逻辑说明
xunxun1982 Jun 10, 2026
0e5ba2e
fix: 处理 PR #131 最新审核细节
xunxun1982 Jun 10, 2026
28d5656
fix: 处理 PR #131 代理池与动态权重审核意见
xunxun1982 Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (a *App) Start() error {
&models.GroupHourlyStat{},
&models.ModelTokenHourlyStat{},
&models.DynamicWeightMetric{},
&models.ProxyPoolItem{},
&sitemanagement.ManagedSite{},
&sitemanagement.ManagedSiteCheckinLog{},
&sitemanagement.ManagedSiteSetting{},
Expand Down
17 changes: 17 additions & 0 deletions internal/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"database/sql/driver"
"errors"
"io"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -31,6 +33,21 @@ var failingCloseCalled atomic.Bool
var persistWALDriverMu sync.Mutex
var persistWALDriverConn *fakeSQLitePersistConn

func TestStartDelegatesProxyPoolNameMigrationToMigrateDatabase(t *testing.T) {
t.Parallel()

contentBytes, err := os.ReadFile("app.go")
require.NoError(t, err)
content := string(contentBytes)

require.NotContains(t, content, "V1_27_0_AddProxyPoolNameUniqueIndex")
autoMigrateIndex := strings.Index(content, "a.db.AutoMigrate(")
migrateDatabaseIndex := strings.Index(content, "dbmigrations.MigrateDatabase(a.db)")
require.NotEqual(t, -1, autoMigrateIndex)
require.NotEqual(t, -1, migrateDatabaseIndex)
require.Less(t, autoMigrateIndex, migrateDatabaseIndex)
}

type blockingCloseDriver struct{}

func (blockingCloseDriver) Open(_ string) (driver.Conn, error) {
Expand Down
17 changes: 17 additions & 0 deletions internal/channel/base_channel_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package channel

import (
"net/http/httptest"
"net/url"
"sync"
"sync/atomic"
"testing"

"github.com/gin-gonic/gin"
)

// mustParseURL is a test helper that parses a URL or panics
Expand Down Expand Up @@ -224,3 +227,17 @@ func TestSelectUpstreamConcurrency(t *testing.T) {
t.Errorf("SelectUpstream() returned nil %d times in concurrent test", errCount)
}
}

func TestBaseChannelIsStreamRequestDefaultsMissingStreamFieldToNonStream(t *testing.T) {
t.Parallel()

bc := &BaseChannel{}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest("POST", "/v1/chat/completions", nil)

result := bc.IsStreamRequest(c, []byte(`{"model":"gpt-test","messages":[]}`))
if result {
t.Fatal("request without stream indicators should be treated as non-stream")
}
}
4 changes: 2 additions & 2 deletions internal/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (f *Factory) newBaseChannel(name string, group *models.Group) (*BaseChannel
// Base configuration for regular requests, derived from the group's effective settings.
clientConfig := &httpclient.Config{
ConnectTimeout: time.Duration(group.EffectiveConfig.ConnectTimeout) * time.Second,
RequestTimeout: time.Duration(group.EffectiveConfig.RequestTimeout) * time.Second,
RequestTimeout: time.Duration(group.EffectiveConfig.NonStreamRequestTimeout) * time.Second,
IdleConnTimeout: time.Duration(group.EffectiveConfig.IdleConnTimeout) * time.Second,
MaxIdleConns: group.EffectiveConfig.MaxIdleConns,
MaxIdleConnsPerHost: group.EffectiveConfig.MaxIdleConnsPerHost,
Expand All @@ -261,7 +261,7 @@ func (f *Factory) newBaseChannel(name string, group *models.Group) (*BaseChannel

// Create a dedicated configuration for streaming requests.
streamConfig := *clientConfig
streamConfig.RequestTimeout = 0
streamConfig.RequestTimeout = time.Duration(group.EffectiveConfig.StreamRequestTimeout) * time.Second
streamConfig.DisableCompression = true
streamConfig.WriteBufferSize = 0
streamConfig.ReadBufferSize = 0
Expand Down
184 changes: 184 additions & 0 deletions internal/channel/factory_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package channel

import (
"context"
"encoding/json"
"gpt-load/internal/config"
"gpt-load/internal/httpclient"
"gpt-load/internal/models"
"gpt-load/internal/types"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -40,6 +44,186 @@ func setupTestFactoryForBenchmark() *Factory {
return NewFactory(settingsManager, clientManager)
}

func TestNewBaseChannelUsesSplitRequestTimeouts(t *testing.T) {
t.Parallel()

factory := setupTestFactory(t)
upstreams := []map[string]any{
{"url": "https://api.openai.com", "weight": 100},
}
upstreamsJSON, err := json.Marshal(upstreams)
require.NoError(t, err)

base, err := factory.newBaseChannel("openai", &models.Group{
ID: 1,
Name: "split-timeout-group",
ChannelType: "openai",
Upstreams: datatypes.JSON(upstreamsJSON),
EffectiveConfig: types.SystemSettings{
ConnectTimeout: 15,
RequestTimeout: 90,
NonStreamRequestTimeout: 45,
StreamRequestTimeout: 120,
IdleConnTimeout: 90,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 30,
},
})
require.NoError(t, err)
require.NotNil(t, base.HTTPClient)
require.NotNil(t, base.StreamClient)

assert.Equal(t, 45*time.Second, base.HTTPClient.Timeout)
assert.Equal(t, 120*time.Second, base.StreamClient.Timeout)
}

func TestNewBaseChannelAllowsUnlimitedStreamTimeout(t *testing.T) {
t.Parallel()

factory := setupTestFactory(t)
upstreams := []map[string]any{
{"url": "https://api.openai.com", "weight": 100},
}
upstreamsJSON, err := json.Marshal(upstreams)
require.NoError(t, err)

base, err := factory.newBaseChannel("openai", &models.Group{
ID: 1,
Name: "unlimited-stream-timeout-group",
ChannelType: "openai",
Upstreams: datatypes.JSON(upstreamsJSON),
EffectiveConfig: types.SystemSettings{
ConnectTimeout: 15,
RequestTimeout: 90,
NonStreamRequestTimeout: 45,
StreamRequestTimeout: 0,
IdleConnTimeout: 90,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 30,
},
})
require.NoError(t, err)
require.NotNil(t, base.StreamClient)

assert.Zero(t, base.StreamClient.Timeout)
}

func TestNewBaseChannelUsesSelectedProxyForHTTPRequests(t *testing.T) {
t.Parallel()

upstreamServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
t.Cleanup(upstreamServer.Close)

proxyHits := make(chan string, 1)
proxyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
case proxyHits <- r.URL.String():
default:
}
w.WriteHeader(http.StatusNoContent)
}))
t.Cleanup(proxyServer.Close)

upstreamsJSON, err := json.Marshal([]map[string]any{
{"url": upstreamServer.URL, "weight": 100, "proxy_url": proxyServer.URL},
})
require.NoError(t, err)

base, err := setupTestFactory(t).newBaseChannel("openai", &models.Group{
ID: 1,
Name: "proxy-flow-group",
ChannelType: "openai",
Upstreams: datatypes.JSON(upstreamsJSON),
EffectiveConfig: types.SystemSettings{
ConnectTimeout: 1,
NonStreamRequestTimeout: 2,
StreamRequestTimeout: 0,
IdleConnTimeout: 30,
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 2,
},
})
require.NoError(t, err)

selection, err := base.SelectUpstreamWithClients(mustParseURL("/proxy/proxy-flow-group/v1/models"), "proxy-flow-group")
require.NoError(t, err)
require.NotNil(t, selection.ProxyURL)
require.Equal(t, proxyServer.URL, *selection.ProxyURL)

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, selection.URL, nil)
require.NoError(t, err)
resp, err := selection.HTTPClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusNoContent, resp.StatusCode)

select {
case requestedURL := <-proxyHits:
assert.Equal(t, selection.URL, requestedURL)
case <-time.After(2 * time.Second):
t.Fatal("expected request to pass through configured proxy")
}
}

func TestNewBaseChannelUsesDirectClientWhenProxySelectionIsEmpty(t *testing.T) {
t.Setenv("HTTP_PROXY", "")
t.Setenv("HTTPS_PROXY", "")
t.Setenv("ALL_PROXY", "")
t.Setenv("NO_PROXY", "")

upstreamHits := make(chan struct{}, 1)
upstreamServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
upstreamHits <- struct{}{}
w.WriteHeader(http.StatusNoContent)
}))
t.Cleanup(upstreamServer.Close)

upstreamsJSON, err := json.Marshal([]map[string]any{
{"url": upstreamServer.URL, "weight": 100, "proxy_url": ""},
})
require.NoError(t, err)

base, err := setupTestFactory(t).newBaseChannel("openai", &models.Group{
ID: 1,
Name: "empty-proxy-group",
ChannelType: "openai",
Upstreams: datatypes.JSON(upstreamsJSON),
EffectiveConfig: types.SystemSettings{
ConnectTimeout: 1,
NonStreamRequestTimeout: 2,
StreamRequestTimeout: 0,
IdleConnTimeout: 30,
MaxIdleConns: 10,
MaxIdleConnsPerHost: 10,
ResponseHeaderTimeout: 2,
ProxyURL: "",
},
})
require.NoError(t, err)

selection, err := base.SelectUpstreamWithClients(mustParseURL("/proxy/empty-proxy-group/v1/models"), "empty-proxy-group")
require.NoError(t, err)
require.Nil(t, selection.ProxyURL)

req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, selection.URL, nil)
require.NoError(t, err)
resp, err := selection.HTTPClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, http.StatusNoContent, resp.StatusCode)

select {
case <-upstreamHits:
case <-time.After(2 * time.Second):
t.Fatal("expected empty proxy selection to reach upstream directly")
}
}

// TestNewFactory tests factory creation
func TestNewFactory(t *testing.T) {
factory := setupTestFactory(t)
Expand Down
48 changes: 47 additions & 1 deletion internal/config/system_settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"gpt-load/internal/syncer"
"gpt-load/internal/types"
"gpt-load/internal/utils"
"net/url"
"os"
"reflect"
"strconv"
Expand All @@ -33,12 +34,43 @@ func NewSystemSettingsManager() *SystemSettingsManager {
return &SystemSettingsManager{}
}

// normalizeSplitRequestTimeouts keeps RequestTimeout synced to NonStreamRequestTimeout,
// which is the source of truth for split timeout configuration.
// It handles legacy-only backfill, explicit non-stream values including zero,
// and the already-synced defaults when neither setting was supplied.
func normalizeSplitRequestTimeouts(settings *types.SystemSettings, hasLegacy, hasNonStream bool) {
if settings == nil {
return
}
if hasNonStream {
// Explicit zero disables non-stream timeout; keep legacy fallback synced to the same value.
settings.RequestTimeout = settings.NonStreamRequestTimeout
return
}
if hasLegacy {
settings.NonStreamRequestTimeout = settings.RequestTimeout
return
}
// Defaults already keep both fields in sync when neither key was supplied.
}

func validateStringSettingValue(key, val string) error {
if key == "failover_status_codes" {
if _, err := failover.ParseStatusCodeMatcher(val); err != nil {
return fmt.Errorf("invalid value for %s (%q): %w", key, val, err)
}
}
if key == "proxy_url" {
if _, err := utils.NormalizeProxyURL(val); err != nil {
return fmt.Errorf("invalid value for %s: %w", key, err)
}
}
if key == "proxy_pool_test_target_url" {
parsed, err := url.Parse(strings.TrimSpace(val))
if err != nil || parsed == nil || parsed.Host == "" || (parsed.Scheme != "http" && parsed.Scheme != "https") {
return fmt.Errorf("invalid value for %s: must be an absolute http or https URL", key)
}
}
return nil
}

Expand Down Expand Up @@ -82,6 +114,9 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager,
}
}
}
_, hasLegacyTimeout := settingsMap["request_timeout"]
_, hasNonStreamTimeout := settingsMap["non_stream_request_timeout"]
normalizeSplitRequestTimeouts(&settings, hasLegacyTimeout, hasNonStreamTimeout)

settings.ProxyKeysMap = utils.StringToSet(settings.ProxyKeys, ",")

Expand Down Expand Up @@ -197,6 +232,11 @@ func (sm *SystemSettingsManager) UpdateSettings(settingsMap map[string]any) erro

// Update database
var settingsToUpdate []models.SystemSetting
if nonStreamTimeout, hasNonStream := settingsMap["non_stream_request_timeout"]; hasNonStream {
settingsMap["request_timeout"] = nonStreamTimeout
} else if legacyTimeout, hasLegacy := settingsMap["request_timeout"]; hasLegacy {
settingsMap["non_stream_request_timeout"] = legacyTimeout
}
for key, value := range settingsMap {
settingsToUpdate = append(settingsToUpdate, models.SystemSetting{
SettingKey: key,
Expand Down Expand Up @@ -274,6 +314,11 @@ func (sm *SystemSettingsManager) GetEffectiveConfig(groupConfigJSON datatypes.JS
}
}
}
normalizeSplitRequestTimeouts(
&effectiveConfig,
groupConfig.RequestTimeout != nil,
groupConfig.NonStreamRequestTimeout != nil,
)

return effectiveConfig
}
Expand Down Expand Up @@ -587,7 +632,8 @@ func (sm *SystemSettingsManager) DisplaySystemConfig(settings types.SystemSettin
logrus.Infof(" Request Log Write Interval: %d minutes", settings.RequestLogWriteIntervalMinutes)

logrus.Info(" --- Request Behavior ---")
logrus.Infof(" Request Timeout: %d seconds", settings.RequestTimeout)
logrus.Infof(" Non-Stream Request Timeout: %d seconds", settings.NonStreamRequestTimeout)
logrus.Infof(" Stream Request Timeout: %d seconds", settings.StreamRequestTimeout)
logrus.Infof(" Connect Timeout: %d seconds", settings.ConnectTimeout)
logrus.Infof(" Response Header Timeout: %d seconds", settings.ResponseHeaderTimeout)
logrus.Infof(" Idle Connection Timeout: %d seconds", settings.IdleConnTimeout)
Expand Down
Loading