From 1e3adcb5d4643091eec055fe82523be2fda8bebb Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Mon, 8 Dec 2025 23:25:41 +0000 Subject: [PATCH 1/8] Change how we expire idle connections. Signed-off-by: Arthur Schreiber Signed-off-by: Tanjin Xu --- go/pools/smartconnpool/pool.go | 49 +++++++++++++++-------------- go/pools/smartconnpool/pool_test.go | 18 ++++++++--- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index 756701d155f..14d57d3b344 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -738,35 +738,36 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { mono := monotonicFromTime(now) closeInStack := func(s *connStack[C]) { - // Do a read-only best effort iteration of all the connection in this - // stack and atomically attempt to mark them as expired. - // Any connections that are marked as expired are _not_ removed from - // the stack; it's generally unsafe to remove nodes from the stack - // besides the head. When clients pop from the stack, they'll immediately - // notice the expired connection and ignore it. - // see: timestamp.expired - for conn := s.Peek(); conn != nil; conn = conn.next.Load() { + expiredConnections := make([]*Pooled[C], 0) + validConnections := make([]*Pooled[C], 0) + + // Pop out connections from the stack until we get a `nil` connection + for conn, ok := s.Pop(); ok; conn, ok = s.Pop() { if conn.timeUsed.expired(mono, timeout) { - pool.Metrics.idleClosed.Add(1) + expiredConnections = append(expiredConnections, conn) + } else { + validConnections = append(validConnections, conn) + } + } - conn.Close() - pool.closedConn() + // Return all the valid connections back to waiters or the stack + for _, conn := range validConnections { + pool.tryReturnConn(conn) + } - // Using context.Background() is fine since MySQL connection already enforces - // a connect timeout via the `db-connect-timeout-ms` config param. - c, err := pool.getNew(context.Background()) - if err != nil { - // If we couldn't open a new connection, just continue - continue - } + // Close all the expired connections and open new ones to replace them + for _, conn := range expiredConnections { + pool.Metrics.idleClosed.Add(1) - // opening a new connection might have raced with other goroutines, - // so it's possible that we got back `nil` here - if c != nil { - // Return the new connection to the pool - pool.tryReturnConn(c) - } + conn.Close() + + err := pool.connReopen(context.Background(), conn, mono) + if err != nil { + pool.closedConn() + continue } + + pool.tryReturnConn(conn) } } diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 8fe697e823b..60dfe61aafe 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1325,16 +1325,24 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) { // Try to get connections while they're being reopened // This should trigger the bug where connections get discarded + wg := sync.WaitGroup{} + for i := 0; i < 2; i++ { - getCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() + wg.Add(1) + go func() { + defer wg.Done() + getCtx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() - conn, err := p.Get(getCtx, nil) - require.NoError(t, err) + conn, err := p.Get(getCtx, nil) + require.NoError(t, err) - p.put(conn) + p.put(conn) + }() } + wg.Wait() + // Wait a moment for all reopening to complete require.EventuallyWithT(t, func(c *assert.CollectT) { // Check the actual number of currently open connections From 6b3d3e6c7171215b4eef66611ef2f2451819db30 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 9 Dec 2025 12:14:13 +0000 Subject: [PATCH 2/8] Add a test. Signed-off-by: Arthur Schreiber --- go/pools/smartconnpool/pool_test.go | 44 +++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 60dfe61aafe..20933611b76 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1373,3 +1373,47 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) { assert.Equal(t, int64(0), state.open.Load()) assert.Equal(t, int64(4), state.close.Load()) } + +func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 10, + IdleTimeout: 50 * time.Millisecond, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + defer p.Close() + + var conns []*Pooled[*TestConn] + for i := 0; i < 10; i++ { + conn, err := p.Get(ctx, nil) + require.NoError(t, err) + conns = append(conns, conn) + } + + for _, conn := range conns { + p.put(conn) + } + + require.EqualValues(t, 10, p.Active()) + require.EqualValues(t, 10, p.Available()) + + // Wait a bit for the idle timeout worker to refresh connections + assert.Eventually(t, func() bool { + return p.Metrics.IdleClosed() > 10 + }, 500*time.Millisecond, 10*time.Millisecond, "Expected at least 10 connections to be closed by idle timeout") + + // Verify that new connections were created to replace the closed ones + require.EqualValues(t, 10, p.Active()) + require.EqualValues(t, 10, p.Available()) + + // Count how many connections in the stack are closed + totalInStack := 0 + for conn := p.clean.Peek(); conn != nil; conn = conn.next.Load() { + totalInStack++ + } + + require.Equal(t, totalInStack, 10) +} From ed617ca43c0fa4cba52c990f36b584bd30d2c0f2 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 9 Dec 2025 13:57:51 +0000 Subject: [PATCH 3/8] Loosen up test requirements. Signed-off-by: Arthur Schreiber --- go/pools/smartconnpool/pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 20933611b76..27931470897 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1415,5 +1415,5 @@ func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) { totalInStack++ } - require.Equal(t, totalInStack, 10) + require.LessOrEqual(t, totalInStack, 10) } From f3a8a0bab19487d8a5569ccf9b9e6e3d9aff79fa Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 10 Dec 2025 12:17:25 +0000 Subject: [PATCH 4/8] Expire only up to 50% of the connections, and add some comments. Signed-off-by: Arthur Schreiber --- go/pools/smartconnpool/pool.go | 30 ++++++++++++++++++++++++- go/pools/smartconnpool/pool_test.go | 35 +++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index 14d57d3b344..8a59127e086 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -738,19 +738,47 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { mono := monotonicFromTime(now) closeInStack := func(s *connStack[C]) { - expiredConnections := make([]*Pooled[C], 0) + // Only expire up to ~half of the active connections at a time. This should + // prevent us from closing too many connections in one go which could lead to + // a lot of `.Get` calls being added to the waitlist if there's a sudden spike + // coming in _after_ connections were popped off the stack but _before_ being + // returned back to the pool. This is unlikely to happen, but better safe than sorry. + // + // We always expire at least one connection per stack per iteration to ensure + // that idle connections are eventually closed even in small pools. + // + // We will expire any additional connections in the next iteration of the idle closer. + expiredConnections := make([]*Pooled[C], 0, max(pool.Active()/2, 1)) validConnections := make([]*Pooled[C], 0) // Pop out connections from the stack until we get a `nil` connection for conn, ok := s.Pop(); ok; conn, ok = s.Pop() { if conn.timeUsed.expired(mono, timeout) { expiredConnections = append(expiredConnections, conn) + + if len(expiredConnections) == cap(expiredConnections) { + // We have collected enough connections for this iteration to expire + break + } } else { validConnections = append(validConnections, conn) } } // Return all the valid connections back to waiters or the stack + // + // The order here is not important - because we can't guarantee to + // restore the order we got the connections out of the stack anyway. + // + // If we return the connections in the order popped off the stack: + // * waiters will get the newest connection first + // * stack will have the oldest connections at the top of the stack. + // + // If we return the connections in reverse order: + // * waiters will get the oldest connection first + // * stack will have the newest connections at the top of the stack. + // + // Neither of these is better or worse than the other. for _, conn := range validConnections { pool.tryReturnConn(conn) } diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 27931470897..643a601c6e3 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1417,3 +1417,38 @@ func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) { require.LessOrEqual(t, totalInStack, 10) } + +func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing.B) { + var state TestState + + capacity := 1000 + + p := NewPool(&Config[*TestConn]{ + Capacity: int64(capacity), + IdleTimeout: 50 * time.Millisecond, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + defer p.Close() + + // Fill the pool + connections := make([]*Pooled[*TestConn], 0, capacity) + for range capacity { + conn, err := p.Get(context.Background(), nil) + if err != nil { + b.Fatal(err) + } + + connections = append(connections, conn) + } + + // Return all connections to the pool + for _, conn := range connections { + conn.Recycle() + } + + b.ResetTimer() + + for b.Loop() { + p.closeIdleResources(time.Now()) + } +} From be1c7288807d99cf3b9ff97d51f3b334a3502617 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 10 Dec 2025 14:45:22 +0000 Subject: [PATCH 5/8] Also pre-allocate the valid connections slice. Signed-off-by: Arthur Schreiber --- go/pools/smartconnpool/pool.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index 8a59127e086..e1626c3d62d 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -738,6 +738,8 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { mono := monotonicFromTime(now) closeInStack := func(s *connStack[C]) { + activeConnections := pool.Active() + // Only expire up to ~half of the active connections at a time. This should // prevent us from closing too many connections in one go which could lead to // a lot of `.Get` calls being added to the waitlist if there's a sudden spike @@ -748,8 +750,8 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { // that idle connections are eventually closed even in small pools. // // We will expire any additional connections in the next iteration of the idle closer. - expiredConnections := make([]*Pooled[C], 0, max(pool.Active()/2, 1)) - validConnections := make([]*Pooled[C], 0) + expiredConnections := make([]*Pooled[C], 0, max(activeConnections/2, 1)) + validConnections := make([]*Pooled[C], 0, activeConnections) // Pop out connections from the stack until we get a `nil` connection for conn, ok := s.Pop(); ok; conn, ok = s.Pop() { From 39cafa18115bdf96a7e437852556d38503e5358a Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 10 Dec 2025 16:41:55 +0000 Subject: [PATCH 6/8] Skip slice allocations for empty stacks. Signed-off-by: Arthur Schreiber --- go/pools/smartconnpool/pool.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index e1626c3d62d..9b330043496 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -738,6 +738,12 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { mono := monotonicFromTime(now) closeInStack := func(s *connStack[C]) { + conn, ok := s.Pop() + if !ok { + // Early return to skip allocating slices when the stack is empty + return + } + activeConnections := pool.Active() // Only expire up to ~half of the active connections at a time. This should @@ -754,7 +760,7 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { validConnections := make([]*Pooled[C], 0, activeConnections) // Pop out connections from the stack until we get a `nil` connection - for conn, ok := s.Pop(); ok; conn, ok = s.Pop() { + for ok { if conn.timeUsed.expired(mono, timeout) { expiredConnections = append(expiredConnections, conn) @@ -765,6 +771,8 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { } else { validConnections = append(validConnections, conn) } + + conn, ok = s.Pop() } // Return all the valid connections back to waiters or the stack From a58823297c78a880461a21feff4ecbd3bca4fa12 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 10 Dec 2025 16:42:34 +0000 Subject: [PATCH 7/8] Fix benchmark setup. Signed-off-by: Arthur Schreiber --- go/pools/smartconnpool/pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 643a601c6e3..fe40c21836a 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1425,7 +1425,7 @@ func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing. p := NewPool(&Config[*TestConn]{ Capacity: int64(capacity), - IdleTimeout: 50 * time.Millisecond, + IdleTimeout: 30 * time.Second, LogWait: state.LogWait, }).Open(newConnector(&state), nil) defer p.Close() From 856324d6aafcb715cd37f86ebd53493dc4fb8923 Mon Sep 17 00:00:00 2001 From: Tanjin Xu Date: Wed, 10 Dec 2025 15:48:41 -0800 Subject: [PATCH 8/8] fix b.Loop Signed-off-by: Tanjin Xu --- go/pools/smartconnpool/pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index fe40c21836a..a71f1bfb86f 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1448,7 +1448,7 @@ func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing. b.ResetTimer() - for b.Loop() { + for i := 0; i < b.N; i++ { p.closeIdleResources(time.Now()) } }