diff --git a/spanner/session.go b/spanner/session.go index 75ee0fd6b424..471a17e56b9e 100644 --- a/spanner/session.go +++ b/spanner/session.go @@ -393,8 +393,9 @@ func (s *session) recycle() { // s is rejected by its home session pool because it expired and the // session pool currently has enough open sessions. s.pool.mu.Unlock() + // s.destroy internally calls decNumInUseLocked. s.destroy(false, true) - s.pool.mu.Lock() + return } s.pool.decNumInUseLocked(context.Background()) s.pool.mu.Unlock() @@ -1367,6 +1368,7 @@ func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool { } p.mu.Lock() defer p.mu.Unlock() + if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) { // Don't expire session if the session is not in idle list (in use), or // if number of open sessions is going below p.MinOpened. diff --git a/spanner/session_test.go b/spanner/session_test.go index 3c022aac524e..68fab8a7f933 100644 --- a/spanner/session_test.go +++ b/spanner/session_test.go @@ -2134,3 +2134,90 @@ func getSessionsPerChannel(sp *sessionPool) map[string]int { } return sessionsPerChannel } + +func TestSessionRecycleAfterPoolClose_NoDoubleDecrement(t *testing.T) { + t.Parallel() + ctx := t.Context() + var buf bytes.Buffer + logger := log.New(&buf, "", 0) + + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + Logger: logger, + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 1, + }, + DisableNativeMetrics: true, + }) + defer teardown() + + sp := client.idleSessions + + // Take a session + sh, err := sp.take(ctx) + if err != nil { + t.Fatalf("failed to take session: %v", err) + } + + // Simulate the race condition where the pool is closed but the session is still valid. + // We cannot use client.Close() because it invalidates all sessions in the health check queue. + // Instead, we manually set the pool to invalid. + sp.mu.Lock() + sp.valid = false + sp.mu.Unlock() + + sh.recycle() + + // Check logs for the error message + logOutput := buf.String() + if strings.Contains(logOutput, "Number of sessions in use is negative") { + t.Errorf("Unexpected error \"Number of sessions in use is negative\" logged: %s", logOutput) + } +} + +func TestSessionRecycle_AlreadyInvalidSession(t *testing.T) { + t.Parallel() + ctx := t.Context() + var buf bytes.Buffer + logger := log.New(&buf, "", 0) + + _, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{ + DisableNativeMetrics: true, + Logger: logger, + SessionPoolConfig: SessionPoolConfig{ + MinOpened: 1, + MaxOpened: 10, + }, + }) + defer teardown() + + sp := client.idleSessions + sh, err := sp.take(ctx) + if err != nil { + t.Fatalf("failed to take session: %v", err) + } + + // Manually invalidate the session to simulate it being closed/expired by another process. + // We need to access the underlying session. + s := sh.session + + // Calling destroy() on the session will invalidate it and decrement numInUse. + // We want to simulate the case where it IS already invalid when recycle is called. + // So we call destroy first. + s.destroy(false, true) + + // Now s is invalid. + // And numInUse has been decremented once. + + sh.recycle() + + // If the bug exists (double decrement), numInUse will be decremented AGAIN. + // Since we started with 1 session in use, destroy made it 0. + // Recycle would make it -1 (which triggers the log). + + // Check logs for the error message + logOutput := buf.String() + if strings.Contains(logOutput, "Number of sessions in use is negative") { + t.Errorf("Unexpected error \"Number of sessions in use is negative\" logged: %s", logOutput) + } +}