Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion spanner/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,9 @@ func (s *session) recycle() {
// s is rejected by its home session pool because it expired and the
// session pool currently has enough open sessions.
s.pool.mu.Unlock()
// s.destroy internally calls decNumInUseLocked.
s.destroy(false, true)
s.pool.mu.Lock()
return
}
s.pool.decNumInUseLocked(context.Background())
s.pool.mu.Unlock()
Expand Down Expand Up @@ -1367,6 +1368,7 @@ func (p *sessionPool) remove(s *session, isExpire bool, wasInUse bool) bool {
}
p.mu.Lock()
defer p.mu.Unlock()

if isExpire && (p.numOpened <= p.MinOpened || s.getIdleList() == nil) {
// Don't expire session if the session is not in idle list (in use), or
// if number of open sessions is going below p.MinOpened.
Expand Down
87 changes: 87 additions & 0 deletions spanner/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2134,3 +2134,90 @@ func getSessionsPerChannel(sp *sessionPool) map[string]int {
}
return sessionsPerChannel
}

func TestSessionRecycleAfterPoolClose_NoDoubleDecrement(t *testing.T) {
t.Parallel()
ctx := t.Context()
var buf bytes.Buffer
logger := log.New(&buf, "", 0)

_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
Logger: logger,
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 1,
},
DisableNativeMetrics: true,
})
defer teardown()

sp := client.idleSessions

// Take a session
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to take session: %v", err)
}

// Simulate the race condition where the pool is closed but the session is still valid.
// We cannot use client.Close() because it invalidates all sessions in the health check queue.
// Instead, we manually set the pool to invalid.
sp.mu.Lock()
sp.valid = false
sp.mu.Unlock()

sh.recycle()

// Check logs for the error message
logOutput := buf.String()
if strings.Contains(logOutput, "Number of sessions in use is negative") {
t.Errorf("Unexpected error \"Number of sessions in use is negative\" logged: %s", logOutput)
}
}

func TestSessionRecycle_AlreadyInvalidSession(t *testing.T) {
t.Parallel()
ctx := t.Context()
var buf bytes.Buffer
logger := log.New(&buf, "", 0)

_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
DisableNativeMetrics: true,
Logger: logger,
SessionPoolConfig: SessionPoolConfig{
MinOpened: 1,
MaxOpened: 10,
},
})
defer teardown()

sp := client.idleSessions
sh, err := sp.take(ctx)
if err != nil {
t.Fatalf("failed to take session: %v", err)
}

// Manually invalidate the session to simulate it being closed/expired by another process.
// We need to access the underlying session.
s := sh.session

// Calling destroy() on the session will invalidate it and decrement numInUse.
// We want to simulate the case where it IS already invalid when recycle is called.
// So we call destroy first.
s.destroy(false, true)

// Now s is invalid.
// And numInUse has been decremented once.

sh.recycle()

// If the bug exists (double decrement), numInUse will be decremented AGAIN.
// Since we started with 1 session in use, destroy made it 0.
// Recycle would make it -1 (which triggers the log).

// Check logs for the error message
logOutput := buf.String()
if strings.Contains(logOutput, "Number of sessions in use is negative") {
t.Errorf("Unexpected error \"Number of sessions in use is negative\" logged: %s", logOutput)
}
}
Loading