From 5b470db43ad8b95e6bc108a140720ed180c7feb8 Mon Sep 17 00:00:00 2001 From: zirain Date: Mon, 19 Jan 2026 12:33:55 +0800 Subject: [PATCH 1/8] feat: Ignore ready and stats listener metrics in shutdown manager calculation Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 98 +++++++++-- internal/cmd/envoy/shutdown_manager_test.go | 186 ++++++++++++++++++++ 2 files changed, 272 insertions(+), 12 deletions(-) create mode 100644 internal/cmd/envoy/shutdown_manager_test.go diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index f1c82dae79..3b4b144a84 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -10,9 +10,11 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "os" "os/signal" + "regexp" "syscall" "time" @@ -132,12 +134,14 @@ func Shutdown(drainTimeout, minDrainDuration time.Duration, exitAtConnections in logger.Error(err, "error failing active health checks") } + useServerConnections := os.Getenv("USE_SERVER_CONNECTIONS") == "true" + // Poll total connections from Envoy admin API until minimum drain period has // been reached and total connections reaches threshold or timeout is exceeded for { elapsedTime := time.Since(startTime) - conn, err := getTotalConnections() + conn, err := getTotalConnections(useServerConnections) if err != nil { logger.Error(err, "error getting total connections") } @@ -169,21 +173,30 @@ func Shutdown(drainTimeout, minDrainDuration time.Duration, exitAtConnections in // postEnvoyAdminAPI sends a POST request to the Envoy admin API func postEnvoyAdminAPI(path string) error { - if resp, err := http.Post(fmt.Sprintf("http://%s:%d/%s", - "localhost", bootstrap.EnvoyAdminPort, path), "application/json", nil); err != nil { + resp, err := http.Post(fmt.Sprintf("http://%s:%d/%s", + "localhost", bootstrap.EnvoyAdminPort, path), "application/json", nil) + if err != nil { return err - } else { - defer resp.Body.Close() + } + defer func() { + _ = resp.Body.Close() + }() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected response status: %s", resp.Status) - } - return nil + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected response status: %s", resp.Status) } + return nil } -// getTotalConnections retrieves the total number of open connections from Envoy's server.total_connections stat -func getTotalConnections() (*int, error) { +func getTotalConnections(useServerTotalConnections bool) (*int, error) { + if useServerTotalConnections { + return getServerConnections() + } + return getDownstreamCXActive() +} + +// getServerConnections retrieves the total number of open connections from Envoy's server.total_connections stat +func getServerConnections() (*int, error) { // Send request to Envoy admin API to retrieve server.total_connections stat if resp, err := http.Get(fmt.Sprintf("http://%s:%d//stats?filter=^server\\.total_connections$&format=json", "localhost", bootstrap.EnvoyAdminPort)); err != nil { @@ -215,8 +228,69 @@ func getTotalConnections() (*int, error) { // Log and return total connections c := r.Stats[0].Value - logger.Info(fmt.Sprintf("total connections: %d", c)) + logger.Info(fmt.Sprintf("total server connections: %d", c)) return &c, nil } } } + +// Define struct to decode JSON response into; expecting a single stat in the response in the format: +// {"stats":[{"name":"server.total_connections","value":123}]} +type envoyStatsResponse struct { + Stats []struct { + Name string + Value int + } +} + +// skipConnectionRE is a regex to match connection stats to be excluded from total connections count +// e.g. admin, ready and stat listener +var skipConnectionRE = regexp.MustCompile(`admin|19001|19003|worker`) + +// getDownstreamCXActive retrieves the total number of open connections from Envoy's listener downstream_cx_active stat +func getDownstreamCXActive() (*int, error) { + // Send request to Envoy admin API to retrieve listener.\.$.downstream_cx_active stat + statFilter := "^listener\\..*\\.downstream_cx_active$" + resp, err := http.Get(fmt.Sprintf("http://%s:%d//stats?filter=%s&format=json", + "localhost", bootstrap.EnvoyAdminPort, statFilter)) + if err != nil { + return nil, err + } + defer func() { + _ = resp.Body.Close() + }() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected response status: %s", resp.Status) + } + + totalConnection, err := parseTotalConnection(resp.Body) + if err != nil { + logger.Error(err, "error parsing total connections from response") + return nil, err + } + + logger.Info(fmt.Sprintf("total downstream connections: %d", totalConnection)) + return totalConnection, nil +} + +func parseTotalConnection(statResponse io.ReadCloser) (*int, error) { + r := &envoyStatsResponse{} + // Decode JSON response into struct + if err := json.NewDecoder(statResponse).Decode(&r); err != nil { + return nil, err + } + + // Defensive check for empty stats + if len(r.Stats) == 0 { + return nil, fmt.Errorf("no stats found") + } + + totalConnection := 0 + for _, stat := range r.Stats { + if excluded := skipConnectionRE.MatchString(stat.Name); !excluded { + totalConnection += stat.Value + } + } + + return &totalConnection, nil +} diff --git a/internal/cmd/envoy/shutdown_manager_test.go b/internal/cmd/envoy/shutdown_manager_test.go new file mode 100644 index 0000000000..53d7057b84 --- /dev/null +++ b/internal/cmd/envoy/shutdown_manager_test.go @@ -0,0 +1,186 @@ +// Copyright Envoy Gateway Authors +// SPDX-License-Identifier: Apache-2.0 +// The full text of the Apache license is available in the LICENSE file at +// the root of the repo. + +package envoy + +import ( + "io" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "k8s.io/utils/ptr" +) + +func TestGetTotalConnection(t *testing.T) { + cases := []struct { + input string + + expectedError error + expectedCount *int + }{ + { + input: `{ + "stats": [ + { + "name": "listener.0.0.0.0_8000.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_0.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_1.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_2.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_3.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_4.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_5.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_6.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_7.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_8.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.0.0.0.0_8000.worker_9.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_0.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_1.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_2.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_3.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_4.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_5.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_6.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_7.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_8.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8080.worker_9.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_0.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_1.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_2.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_3.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_4.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_5.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_6.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_7.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_8.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.127.0.0.1_8081.worker_9.downstream_cx_active", + "value": 0 + }, + { + "name": "listener.admin.downstream_cx_active", + "value": 2 + }, + { + "name": "listener.admin.main_thread.downstream_cx_active", + "value": 2 + } + ] +}`, + expectedCount: ptr.To(0), + }, + } + + for _, tc := range cases { + t.Run("", func(t *testing.T) { + reader := strings.NewReader(tc.input) + rc := io.NopCloser(reader) + defer func() { + _ = rc.Close() + }() + gotCount, gotError := parseTotalConnection(rc) + require.Equal(t, tc.expectedError, gotError) + require.Equal(t, tc.expectedCount, gotCount) + }) + } + +} From 2dc887624babbc29744ca6a7e6cba21884e6548f Mon Sep 17 00:00:00 2001 From: zirain Date: Mon, 19 Jan 2026 13:00:19 +0800 Subject: [PATCH 2/8] fix Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 25 +++++++++------------ internal/cmd/envoy/shutdown_manager_test.go | 1 - 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 3b4b144a84..1d49c0322f 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -195,6 +195,15 @@ func getTotalConnections(useServerTotalConnections bool) (*int, error) { return getDownstreamCXActive() } +// Define struct to decode JSON response into; expecting a single stat in the response in the format: +// {"stats":[{"name":"server.total_connections","value":123}]} +type envoyStatsResponse struct { + Stats []struct { + Name string + Value int + } +} + // getServerConnections retrieves the total number of open connections from Envoy's server.total_connections stat func getServerConnections() (*int, error) { // Send request to Envoy admin API to retrieve server.total_connections stat @@ -209,12 +218,7 @@ func getServerConnections() (*int, error) { } else { // Define struct to decode JSON response into; expecting a single stat in the response in the format: // {"stats":[{"name":"server.total_connections","value":123}]} - var r *struct { - Stats []struct { - Name string - Value int - } - } + r := &envoyStatsResponse{} // Decode JSON response into struct if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { @@ -234,15 +238,6 @@ func getServerConnections() (*int, error) { } } -// Define struct to decode JSON response into; expecting a single stat in the response in the format: -// {"stats":[{"name":"server.total_connections","value":123}]} -type envoyStatsResponse struct { - Stats []struct { - Name string - Value int - } -} - // skipConnectionRE is a regex to match connection stats to be excluded from total connections count // e.g. admin, ready and stat listener var skipConnectionRE = regexp.MustCompile(`admin|19001|19003|worker`) diff --git a/internal/cmd/envoy/shutdown_manager_test.go b/internal/cmd/envoy/shutdown_manager_test.go index 53d7057b84..e444b500bb 100644 --- a/internal/cmd/envoy/shutdown_manager_test.go +++ b/internal/cmd/envoy/shutdown_manager_test.go @@ -182,5 +182,4 @@ func TestGetTotalConnection(t *testing.T) { require.Equal(t, tc.expectedCount, gotCount) }) } - } From fd38e6867050e1f4fab603b948a6b4b7cda5e2e1 Mon Sep 17 00:00:00 2001 From: zirain Date: Mon, 19 Jan 2026 13:09:46 +0800 Subject: [PATCH 3/8] fix Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 6 +++--- internal/cmd/envoy/shutdown_manager_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 1d49c0322f..79d5857e04 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -258,9 +258,9 @@ func getDownstreamCXActive() (*int, error) { return nil, fmt.Errorf("unexpected response status: %s", resp.Status) } - totalConnection, err := parseTotalConnection(resp.Body) + totalConnection, err := parseDownstreamCXActive(resp.Body) if err != nil { - logger.Error(err, "error parsing total connections from response") + logger.Error(err, "error parsing downstream connections from response") return nil, err } @@ -268,7 +268,7 @@ func getDownstreamCXActive() (*int, error) { return totalConnection, nil } -func parseTotalConnection(statResponse io.ReadCloser) (*int, error) { +func parseDownstreamCXActive(statResponse io.ReadCloser) (*int, error) { r := &envoyStatsResponse{} // Decode JSON response into struct if err := json.NewDecoder(statResponse).Decode(&r); err != nil { diff --git a/internal/cmd/envoy/shutdown_manager_test.go b/internal/cmd/envoy/shutdown_manager_test.go index e444b500bb..3404ed3fdf 100644 --- a/internal/cmd/envoy/shutdown_manager_test.go +++ b/internal/cmd/envoy/shutdown_manager_test.go @@ -14,7 +14,7 @@ import ( "k8s.io/utils/ptr" ) -func TestGetTotalConnection(t *testing.T) { +func TestGetDownstreamCXActive(t *testing.T) { cases := []struct { input string @@ -177,7 +177,7 @@ func TestGetTotalConnection(t *testing.T) { defer func() { _ = rc.Close() }() - gotCount, gotError := parseTotalConnection(rc) + gotCount, gotError := parseDownstreamCXActive(rc) require.Equal(t, tc.expectedError, gotError) require.Equal(t, tc.expectedCount, gotCount) }) From 70c05a0a56435a5a504494d9e9ec6697e1cdc7ff Mon Sep 17 00:00:00 2001 From: zirain Date: Mon, 19 Jan 2026 15:48:26 +0800 Subject: [PATCH 4/8] refactor Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 101 +++++++++----------- internal/cmd/envoy/shutdown_manager_test.go | 84 ++++++++++++++-- 2 files changed, 120 insertions(+), 65 deletions(-) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 79d5857e04..6428269f90 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -10,11 +10,12 @@ import ( "encoding/json" "errors" "fmt" - "io" + "net" "net/http" "os" "os/signal" "regexp" + "strconv" "syscall" "time" @@ -141,7 +142,7 @@ func Shutdown(drainTimeout, minDrainDuration time.Duration, exitAtConnections in for { elapsedTime := time.Since(startTime) - conn, err := getTotalConnections(useServerConnections) + conn, err := getTotalConnections(useServerConnections, bootstrap.EnvoyAdminPort) if err != nil { logger.Error(err, "error getting total connections") } @@ -188,11 +189,11 @@ func postEnvoyAdminAPI(path string) error { return nil } -func getTotalConnections(useServerTotalConnections bool) (*int, error) { +func getTotalConnections(useServerTotalConnections bool, port int) (*int, error) { if useServerTotalConnections { - return getServerConnections() + return getServerConnections(port) } - return getDownstreamCXActive() + return getDownstreamCXActive(port) } // Define struct to decode JSON response into; expecting a single stat in the response in the format: @@ -205,52 +206,27 @@ type envoyStatsResponse struct { } // getServerConnections retrieves the total number of open connections from Envoy's server.total_connections stat -func getServerConnections() (*int, error) { +func getServerConnections(port int) (*int, error) { // Send request to Envoy admin API to retrieve server.total_connections stat - if resp, err := http.Get(fmt.Sprintf("http://%s:%d//stats?filter=^server\\.total_connections$&format=json", - "localhost", bootstrap.EnvoyAdminPort)); err != nil { - return nil, err - } else { - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected response status: %s", resp.Status) - } else { - // Define struct to decode JSON response into; expecting a single stat in the response in the format: - // {"stats":[{"name":"server.total_connections","value":123}]} - r := &envoyStatsResponse{} - - // Decode JSON response into struct - if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { - return nil, err - } - - // Defensive check for empty stats - if len(r.Stats) == 0 { - return nil, fmt.Errorf("no stats found") - } - - // Log and return total connections - c := r.Stats[0].Value - logger.Info(fmt.Sprintf("total server connections: %d", c)) - return &c, nil - } + statFilter := "^server\\.total_connections$" + r, err := getStatsFromEnvoyStatsEndpoint(port, statFilter) + if err != nil { + return nil, fmt.Errorf("error getting server total_connections stat: %w", err) } -} -// skipConnectionRE is a regex to match connection stats to be excluded from total connections count -// e.g. admin, ready and stat listener -var skipConnectionRE = regexp.MustCompile(`admin|19001|19003|worker`) + // Log and return total connections + c := r.Stats[0].Value + logger.Info(fmt.Sprintf("total server connections: %d", c)) + return &c, nil +} -// getDownstreamCXActive retrieves the total number of open connections from Envoy's listener downstream_cx_active stat -func getDownstreamCXActive() (*int, error) { - // Send request to Envoy admin API to retrieve listener.\.$.downstream_cx_active stat - statFilter := "^listener\\..*\\.downstream_cx_active$" - resp, err := http.Get(fmt.Sprintf("http://%s:%d//stats?filter=%s&format=json", - "localhost", bootstrap.EnvoyAdminPort, statFilter)) +func getStatsFromEnvoyStatsEndpoint(port int, statFilter string) (*envoyStatsResponse, error) { + resp, err := http.Get(fmt.Sprintf("http://%s//stats?filter=%s&format=json", + net.JoinHostPort("localhost", strconv.Itoa(port)), statFilter)) if err != nil { return nil, err } + defer func() { _ = resp.Body.Close() }() @@ -258,20 +234,9 @@ func getDownstreamCXActive() (*int, error) { return nil, fmt.Errorf("unexpected response status: %s", resp.Status) } - totalConnection, err := parseDownstreamCXActive(resp.Body) - if err != nil { - logger.Error(err, "error parsing downstream connections from response") - return nil, err - } - - logger.Info(fmt.Sprintf("total downstream connections: %d", totalConnection)) - return totalConnection, nil -} - -func parseDownstreamCXActive(statResponse io.ReadCloser) (*int, error) { r := &envoyStatsResponse{} // Decode JSON response into struct - if err := json.NewDecoder(statResponse).Decode(&r); err != nil { + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { return nil, err } @@ -280,6 +245,28 @@ func parseDownstreamCXActive(statResponse io.ReadCloser) (*int, error) { return nil, fmt.Errorf("no stats found") } + return r, nil +} + +// getDownstreamCXActive retrieves the total number of open connections from Envoy's listener downstream_cx_active stat +func getDownstreamCXActive(port int) (*int, error) { + // Send request to Envoy admin API to retrieve listener.\.$.downstream_cx_active stat + statFilter := "^listener\\..*\\.downstream_cx_active$" + r, err := getStatsFromEnvoyStatsEndpoint(port, statFilter) + if err != nil { + return nil, fmt.Errorf("error getting listener downstream_cx_active stat: %w", err) + } + + totalConnection := filterDownstreamCXActive(r) + logger.Info(fmt.Sprintf("total downstream connections: %d", totalConnection)) + return totalConnection, nil +} + +// skipConnectionRE is a regex to match connection stats to be excluded from total connections count +// e.g. admin, ready and stat listener +var skipConnectionRE = regexp.MustCompile(`admin|19001|19003|worker`) + +func filterDownstreamCXActive(r *envoyStatsResponse) *int { totalConnection := 0 for _, stat := range r.Stats { if excluded := skipConnectionRE.MatchString(stat.Name); !excluded { @@ -287,5 +274,5 @@ func parseDownstreamCXActive(statResponse io.ReadCloser) (*int, error) { } } - return &totalConnection, nil + return &totalConnection } diff --git a/internal/cmd/envoy/shutdown_manager_test.go b/internal/cmd/envoy/shutdown_manager_test.go index 3404ed3fdf..0322eba029 100644 --- a/internal/cmd/envoy/shutdown_manager_test.go +++ b/internal/cmd/envoy/shutdown_manager_test.go @@ -6,31 +6,68 @@ package envoy import ( + "errors" + "fmt" "io" + "net" + "net/http" + "strconv" "strings" "testing" + "time" "github.com/stretchr/testify/require" "k8s.io/utils/ptr" ) -func TestGetDownstreamCXActive(t *testing.T) { +// setupFakeEnvoyStats set up an HTTP server return content +func setupFakeEnvoyStats(t *testing.T, content string) *http.Server { + l, err := net.Listen("tcp", ":0") //nolint: gosec + require.NoError(t, err) + require.NoError(t, l.Close()) + mux := http.NewServeMux() + mux.HandleFunc("/", func(writer http.ResponseWriter, _ *http.Request) { + writer.Header().Set("Content-Type", "application/json") + writer.WriteHeader(http.StatusOK) + _, _ = writer.Write([]byte(content)) + }) + + addr := l.Addr().String() + s := &http.Server{ + Addr: addr, + Handler: mux, + ReadHeaderTimeout: time.Second, + } + t.Logf("start to listen at %s ", addr) + go func() { + if err := s.ListenAndServe(); err != nil { + fmt.Println("fail to listen: ", err) + } + }() + + return s +} + +func TestGetTotalConnections(t *testing.T) { cases := []struct { - input string + name string + input string + useServerTotalConnections bool expectedError error expectedCount *int }{ { + name: "downstream_cx_active", input: `{ "stats": [ { "name": "listener.0.0.0.0_8000.downstream_cx_active", - "value": 0 + "value": 1 }, { "name": "listener.0.0.0.0_8000.worker_0.downstream_cx_active", - "value": 0 + "value": 1 }, { "name": "listener.0.0.0.0_8000.worker_1.downstream_cx_active", @@ -166,19 +203,50 @@ func TestGetDownstreamCXActive(t *testing.T) { } ] }`, - expectedCount: ptr.To(0), + expectedCount: ptr.To(1), + }, + { + name: "total_connections", + input: `{"stats":[{"name":"server.total_connections","value":1}]}`, + useServerTotalConnections: true, + expectedCount: ptr.To(1), + }, + { + name: "invalid-with-server.total_connections", + input: `{"stats":[{"name":"server.total_connections","value":1]}`, + useServerTotalConnections: true, + expectedError: errors.New("error getting server total_connections stat"), + }, + { + name: "invalid", + input: `{"stats":[{"name":"listener.0.0.0.0_8000.downstream_cx_active","value":1]}`, + expectedError: errors.New("error getting listener downstream_cx_active stat"), }, } for _, tc := range cases { - t.Run("", func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { + s := setupFakeEnvoyStats(t, tc.input) + _, port, err := net.SplitHostPort(s.Addr) + require.NoError(t, err) + + p, err := strconv.Atoi(port) + require.NoError(t, err) + defer func() { + _ = s.Close() + }() reader := strings.NewReader(tc.input) rc := io.NopCloser(reader) defer func() { _ = rc.Close() }() - gotCount, gotError := parseDownstreamCXActive(rc) - require.Equal(t, tc.expectedError, gotError) + + gotCount, gotError := getTotalConnections(tc.useServerTotalConnections, p) + if tc.expectedError != nil { + require.ErrorContains(t, gotError, tc.expectedError.Error()) + return + } + require.NoError(t, gotError) require.Equal(t, tc.expectedCount, gotCount) }) } From df8600ffc686753ada47bcd62d25f7c3ed6b0821 Mon Sep 17 00:00:00 2001 From: zirain Date: Tue, 20 Jan 2026 08:45:55 +0800 Subject: [PATCH 5/8] remove USE_SERVER_CONNECTIONS Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 24 ++------------------- internal/cmd/envoy/shutdown_manager_test.go | 19 +++------------- 2 files changed, 5 insertions(+), 38 deletions(-) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 6428269f90..53af32d525 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -135,14 +135,12 @@ func Shutdown(drainTimeout, minDrainDuration time.Duration, exitAtConnections in logger.Error(err, "error failing active health checks") } - useServerConnections := os.Getenv("USE_SERVER_CONNECTIONS") == "true" - // Poll total connections from Envoy admin API until minimum drain period has // been reached and total connections reaches threshold or timeout is exceeded for { elapsedTime := time.Since(startTime) - conn, err := getTotalConnections(useServerConnections, bootstrap.EnvoyAdminPort) + conn, err := getTotalConnections(bootstrap.EnvoyAdminPort) if err != nil { logger.Error(err, "error getting total connections") } @@ -189,10 +187,7 @@ func postEnvoyAdminAPI(path string) error { return nil } -func getTotalConnections(useServerTotalConnections bool, port int) (*int, error) { - if useServerTotalConnections { - return getServerConnections(port) - } +func getTotalConnections(port int) (*int, error) { return getDownstreamCXActive(port) } @@ -205,21 +200,6 @@ type envoyStatsResponse struct { } } -// getServerConnections retrieves the total number of open connections from Envoy's server.total_connections stat -func getServerConnections(port int) (*int, error) { - // Send request to Envoy admin API to retrieve server.total_connections stat - statFilter := "^server\\.total_connections$" - r, err := getStatsFromEnvoyStatsEndpoint(port, statFilter) - if err != nil { - return nil, fmt.Errorf("error getting server total_connections stat: %w", err) - } - - // Log and return total connections - c := r.Stats[0].Value - logger.Info(fmt.Sprintf("total server connections: %d", c)) - return &c, nil -} - func getStatsFromEnvoyStatsEndpoint(port int, statFilter string) (*envoyStatsResponse, error) { resp, err := http.Get(fmt.Sprintf("http://%s//stats?filter=%s&format=json", net.JoinHostPort("localhost", strconv.Itoa(port)), statFilter)) diff --git a/internal/cmd/envoy/shutdown_manager_test.go b/internal/cmd/envoy/shutdown_manager_test.go index 0322eba029..16b904696f 100644 --- a/internal/cmd/envoy/shutdown_manager_test.go +++ b/internal/cmd/envoy/shutdown_manager_test.go @@ -50,9 +50,8 @@ func setupFakeEnvoyStats(t *testing.T, content string) *http.Server { func TestGetTotalConnections(t *testing.T) { cases := []struct { - name string - input string - useServerTotalConnections bool + name string + input string expectedError error expectedCount *int @@ -205,18 +204,6 @@ func TestGetTotalConnections(t *testing.T) { }`, expectedCount: ptr.To(1), }, - { - name: "total_connections", - input: `{"stats":[{"name":"server.total_connections","value":1}]}`, - useServerTotalConnections: true, - expectedCount: ptr.To(1), - }, - { - name: "invalid-with-server.total_connections", - input: `{"stats":[{"name":"server.total_connections","value":1]}`, - useServerTotalConnections: true, - expectedError: errors.New("error getting server total_connections stat"), - }, { name: "invalid", input: `{"stats":[{"name":"listener.0.0.0.0_8000.downstream_cx_active","value":1]}`, @@ -241,7 +228,7 @@ func TestGetTotalConnections(t *testing.T) { _ = rc.Close() }() - gotCount, gotError := getTotalConnections(tc.useServerTotalConnections, p) + gotCount, gotError := getTotalConnections(p) if tc.expectedError != nil { require.ErrorContains(t, gotError, tc.expectedError.Error()) return From b055d92d7442a2996c186a57e556816818b50de2 Mon Sep 17 00:00:00 2001 From: zirain Date: Wed, 28 Jan 2026 08:59:20 +0800 Subject: [PATCH 6/8] address review comment Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 53af32d525..08e3666d8d 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -177,6 +177,9 @@ func postEnvoyAdminAPI(path string) error { if err != nil { return err } + if resp == nil { + return errors.New("unexcepted nil response from Envoy admin API") + } defer func() { _ = resp.Body.Close() }() From 071979112ee96e680b8779083c9e87df2196a52d Mon Sep 17 00:00:00 2001 From: zirain Date: Wed, 28 Jan 2026 12:32:25 +0800 Subject: [PATCH 7/8] display the real value Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 08e3666d8d..56fe76905a 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -241,7 +241,7 @@ func getDownstreamCXActive(port int) (*int, error) { } totalConnection := filterDownstreamCXActive(r) - logger.Info(fmt.Sprintf("total downstream connections: %d", totalConnection)) + logger.Info(fmt.Sprintf("total downstream connections: %d", *totalConnection)) return totalConnection, nil } From 2d9ad56408a23fa1b4b034364d1a258b5514abde Mon Sep 17 00:00:00 2001 From: zirain Date: Wed, 28 Jan 2026 13:09:30 +0800 Subject: [PATCH 8/8] comment for worker thread Signed-off-by: zirain --- internal/cmd/envoy/shutdown_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/envoy/shutdown_manager.go b/internal/cmd/envoy/shutdown_manager.go index 56fe76905a..cd56853392 100644 --- a/internal/cmd/envoy/shutdown_manager.go +++ b/internal/cmd/envoy/shutdown_manager.go @@ -246,7 +246,7 @@ func getDownstreamCXActive(port int) (*int, error) { } // skipConnectionRE is a regex to match connection stats to be excluded from total connections count -// e.g. admin, ready and stat listener +// e.g. admin, ready and stat listener and stats from worker thread var skipConnectionRE = regexp.MustCompile(`admin|19001|19003|worker`) func filterDownstreamCXActive(r *envoyStatsResponse) *int {