diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index 2beb0b8ac46..b6b7850226b 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -460,9 +460,13 @@ func (pool *ConnPool[C]) pop(stack *connStack[C]) *Pooled[C] { // to expire this connection (even if it's still visible to them), so it's // safe to return it for conn, ok := stack.Pop(); ok; conn, ok = stack.Pop() { - if conn.timeUsed.borrow() { - return conn + if !conn.timeUsed.borrow() { + // Ignore the connection that couldn't be borrowed; + // it's being closed by the idle worker and replaced by a new connection. + continue } + + return conn } return nil } @@ -787,11 +791,23 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) { for conn := s.Peek(); conn != nil; conn = conn.next.Load() { if conn.timeUsed.expired(mono, timeout) { pool.Metrics.idleClosed.Add(1) + conn.Close() + pool.closedConn() + // Using context.Background() is fine since MySQL connection already enforces // a connect timeout via the `db-connect-timeout-ms` config param. - if err := pool.connReopen(context.Background(), conn, mono); err != nil { - pool.closedConn() + c, err := pool.getNew(context.Background()) + if err != nil { + // If we couldn't open a new connection, just continue + continue + } + + // opening a new connection might have raced with other goroutines, + // so it's possible that we got back `nil` here + if c != nil { + // Return the new connection to the pool + pool.tryReturnConn(c) } } } diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 24100864d25..741ff8db408 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -1320,3 +1320,88 @@ func TestCloseDuringWaitForConn(t *testing.T) { require.EqualValues(t, 0, state.open.Load()) } } + +// TestIdleTimeoutConnectionLeak checks for leaked connections after idle timeout +func TestIdleTimeoutConnectionLeak(t *testing.T) { + var state TestState + + // Slow connection creation to ensure idle timeout happens during reopening + state.chaos.delayConnect = 300 * time.Millisecond + + p := NewPool(&Config[*TestConn]{ + Capacity: 2, + IdleTimeout: 50 * time.Millisecond, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + getCtx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + + // Get and return two connections + conn1, err := p.Get(getCtx, nil) + require.NoError(t, err) + + conn2, err := p.Get(getCtx, nil) + require.NoError(t, err) + + p.put(conn1) + p.put(conn2) + + // At this point: Active=2, InUse=0, Available=2 + require.EqualValues(t, 2, p.Active()) + require.EqualValues(t, 0, p.InUse()) + require.EqualValues(t, 2, p.Available()) + + // Wait for idle timeout to kick in and start expiring connections + require.EventuallyWithT(t, func(c *assert.CollectT) { + // Check the actual number of currently open connections + assert.Equal(c, int64(2), state.open.Load()) + // Check the total number of closed connections + assert.Equal(c, int64(1), state.close.Load()) + }, 100*time.Millisecond, 10*time.Millisecond) + + // At this point, the idle timeout worker has expired the connections + // and is trying to reopen them (which takes 300ms due to delayConnect) + + // Try to get connections while they're being reopened + // This should trigger the bug where connections get discarded + for i := 0; i < 2; i++ { + getCtx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + defer cancel() + + conn, err := p.Get(getCtx, nil) + require.NoError(t, err) + + p.put(conn) + } + + // Wait a moment for all reopening to complete + require.EventuallyWithT(t, func(c *assert.CollectT) { + // Check the actual number of currently open connections + require.Equal(c, int64(2), state.open.Load()) + // Check the total number of closed connections + require.Equal(c, int64(2), state.close.Load()) + }, 400*time.Millisecond, 10*time.Millisecond) + + // Check the pool state + assert.Equal(t, int64(2), p.Active()) + assert.Equal(t, int64(0), p.InUse()) + assert.Equal(t, int64(2), p.Available()) + assert.Equal(t, int64(2), p.Metrics.IdleClosed()) + + // Try to close the pool - if there are leaked connections, this will timeout + closeCtx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + + err = p.CloseWithContext(closeCtx) + require.NoError(t, err) + + // Pool should be completely closed now + assert.Equal(t, int64(0), p.Active()) + assert.Equal(t, int64(0), p.InUse()) + assert.Equal(t, int64(0), p.Available()) + assert.Equal(t, int64(2), p.Metrics.IdleClosed()) + + assert.Equal(t, int64(0), state.open.Load()) + assert.Equal(t, int64(4), state.close.Load()) +}